RabbitMQ学习笔记

分布式架构:一个请求由多个服务(服务或者系统)协同处理完成。

RabbitMQ:消息队列,或者说是一个消息中间键。

MQ功能:

  1. 削峰:不让系统直接访问系统,而是通过MQ,然后交给MQ去访问系统,使得访问的人员进行排队。(有排队就会导致比较慢,但是安全有次序。)
  2. 应用解耦:通过队列去访问系统,各个系统之间通过MQ去完成,也就会不会产生过多故障。
  3. 异步处理:A-B,并不要A收到B的确定消息才会执行操作。

MQ的分类:

  1. RabbitMQ: 和spring是一家公司的,使用AMQP(高级消息队列协议)基础上完成的,支持多种语言,缺点是商业版要付费。

  2. kafka:为大数据而生,有百万级TPS的,实时计算,吞吐量高,使用在日志采集等方面。缺点是,单机超过64个队列/分区,load会发生明显的飙高现象,队列越多,load越高,发送响应时间越长。消费失败不支持重试,支持消息顺序,但一代代理宕机后,会产生消息乱序,社区更新慢。

  3. RocketMQ:由阿里开发,金融行业需要,使用java语言实现,参考了kafka,一般常用于订单,交易,充值,流计算,消息推送,日志处理,bingblog分发等场景。

    1. 优点:单机吞吐10w级别,信息可以做到0丢失,MQ功能比较完善,还是分布式,拓展性号,支持10亿级别的消息堆积,不会因为堆积而导致性能下降。
    2. 缺点:这次客户端语言不多,目前是java和C++,其中C++还不成熟,社区活跃一般。

初级部分:

RabbitMQ:是一个消息中间件,可以当做一个快递站,快递员发包裹到快递站,然后本地快递员将快递发给用户。

生产者:生产者 - > mq (一个交换机可以绑定多个队列) -> 消费者,一般情况下是一个队列对应一个消费者,如果两个一个队列对应两个消费者,那么就只有一个消费者收到消息,这就想当与只有一个包裹是不可能发给两个人的。

核心部分:

  1. 简单模式
  2. 工作模式
  3. 发布订阅模式
  4. 路由模式
  5. 主题模式
  6. 发布确认模式

connection:publisher/consumer 和 broker之间的TCP连接。

Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销是巨大的,效率也比较低。channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。

安装rabbitMQ:

官网:https://www.rabbitmq.com

下载页面:https://www.rabbitmq.com/download.html

我一般使用docker安装

1
2
3
4
# latest RabbitMQ 3.10
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.10-management


其他安装方式:

  1. 首先现在linux下的压缩包,或者使用工具将安装包上传到linux系统。
  1. 安装rabbitMQ的支持依赖

    1. rpm -ivh erlang-21.3-1.el7.x86_64.rpm
    2. yml install socat -y
    3. rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
  2. 添加常用命令:

    1. 添加开机启动chkconfig rabbitmq-server on

    2. 启动服务

      1. 找到安装路径

      2. sbin下的service 里边的rabbitmq-server start /sbin/service rabbitmq-server start

      3. 查看服务状态/sbin/service rabbitmq-server status

      4. 停止服务/sbin/service rabbitmq-server stop

      5. 开启后台web管理插件rabbitmq-plugins enable rabbitmq_management(docker安装的不要这么操作)

      6. 使用账号访问,一般是账号密码都是guest,当然你必须保证你的端口号开放,如果云服务器的话,15972,注意ip地址,如果有域名的话,那么可以在域名后添加:15672直接访问.

      7. 如果不能直接访问的话,检查一下防火墙,systemctl stop firewalld,这个命令关闭本次防火墙,systemctl enable firewalld可以下次开机也关闭防火墙,systemctl status firewalld可以查看当前系统防护墙状态

      8. 添加一个新用户,如果是使用虚拟机的话,一般是不能直接使用guest登录的,当然这个我没碰到过

        1. 创建一个账号rabbitmqctl add_user 用户名 密码

        2. 设置用户角色 rabbitmqctl set_user_tags admin administractor

        3. 设置用户权限

          set_permission[-p <vhostpath>] <user> <conf> <write> <read>

          举例:

          rabbitmqctl set_permission -p "/" admin ".*" ".*" ".*"用户user_admin具有/vhost1这个vritual host中的所有资源的配置,写,读权限。

        4. 当前用户和角色

          rabbitmqctl list_user

如何使用MQ(java代码实现)

IDEA新建maven工程导入依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<!--指定jdk版本-->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!--rabbitmq依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.7</version>
</dependency>
</dependencies>

生产者和消费者:

生产者 - 消息队列 - 消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/*
生产者,目标是发消息
*/
public class Product {
//队列名称

public static final String QUEUE_NAME="hello";

//发消息
public static void main(String[] args) throws IOException, TimeoutException {
//建立一个连接方式
ConnectionFactory factory = new ConnectionFactory();
//工厂IP 连接RabbitMQ的队列
factory.setHost("www.littlehei.fun");
//用户名
factory.setUsername("guest");

//密码
factory.setPassword("guest");

//创建链接
Connection connection = factory.newConnection();

//获取信道
Channel channel = connection.createChannel();

//生成一个队列
/*
生成队列
参数1,队列名称
参数2,队列里边的消息是否持久化(磁盘)默认情况消息存储在内存中
参数3,该队列是否只供一个消费者进行消费,是否进行消息共享,true可以多个消费者消费,false:只能一个消费者消费
参数4,是否自动删除,最后一个消费者断开链接以后,该队是否自动删除,true,自动删除,false不删除
参数5,其他参数
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//发消息
String message = "hello world";//初次使用

/*
发送一个消费
参数1,发送到哪个交换机
参数2.路由的key值是哪个? 本次是队列的名称
参数3,其他参数配置
参数4,发送消息的消息体
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息发送成功");
}

}

消费者:用来接受生成者产生的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/*
消费者,用来消费生成者产生的代码
*/
public class Consumer {
//队列的名称:
private static final String QUEUE_NAME="hello";

//接收消息
public static void main(String[] args) throws IOException, TimeoutException {
//创建链接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("www.littlehei.fun");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

//声明
DeliverCallback deliverCallback = (consumerTag,message)->{
// String message = new String("自己手动去创建一个消息,但是不推荐");
System.out.println(new String(message.getBody()));
};

//取消消息时的回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息消费被中断");
};

/*
消费者接收消息
参数1,消费哪个队列
参数2,消费成功之后是否要自动应答,true表示自动应答,false表示不是
参数3,消费者未成功消费的回调
参数4,消费者取录消费的回到
*/

channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

}
}

工作队列

生成者 —-大量发消息— 队列 — 接到消息—工作线程1或者工作线程2.。。。

==注意:一个消息只能被处理一次不能处理多次==

轮训处理消息,你一个,我下一个,他下下个。

不同工作线程之间的关系是==竞争==关系

创建链接工具类:

1
2
3
4
5
6
7
8
9
10
11
12
13
public class GetConnection {
//建立一个工具类,每次都直接使用,减少代码重复量
public static Channel getChannel() throws IOException, TimeoutException {
//创建一个链接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("www.littlehei.fun");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}

轮训分发代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/*
一个工作线程,可以多个创建,可以多线程创建,具体看你自己如何定于
*/
public class WorkThread1 {
//首先还是创建一个队列名称
public static final String QUEUE_NAME = "hello";

//接收消息
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = GetConnection.getChannel();

/*
消费者接收消息
参数1,消费哪个队列
参数2,消费成功之后是否要自动应答,true表示自动应答,false表示不是
参数3,消费者未成功消费的回调
参数4,消费者取录消费的回到
*/
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println("接收到的消息为" + new String(message.getBody()));
};

CancelCallback cancelCallback = (consumerTag)->{
System.out.println("消息被取消消费接口回调");
};
System.out.println("C1等待接收消息...");

//消息接收
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/*
生产者,可以发送大量消息
*/
public class Product1 {

//队列名称
public static final String QUEUE_NAEM="hello";

//发送大量消息
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = GetConnection.getChannel();
//队列声明
channel.queueDeclare(QUEUE_NAEM,false,false,false,null);

//发送消息
//从控制台中输入
Scanner sc = new Scanner(System.in);
//判断是否有下一个消息输入

while (sc.hasNext()){
String name = sc.next();
channel.basicPublish("",QUEUE_NAEM,null,name.getBytes());
System.out.println("发送完成: "+name);
}

}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
----------结果----------
nihap
发送完成nihap
wp1
发送完成wp1
ci1
发送完成ci1
wooda
发送完成wooda
我喜欢你
发送完成我喜欢你
------------------
C1等待接收消息...
接收到的消息为nihap
接收到的消息为ci1
接收到的消息为我喜欢你
--------------------
C2等待接收消息...
接收到的消息为wp1
接收到的消息为wooda

消息应答

为了防止消息在发送过程中不丢失,他是==指消费者收到消息并且处理该消息之后,告诉rabbitmq他已经处理完成了,rabbitmq可以把消息删除。==

自动应答:

需要在高吞吐量和数据传输安全性方面做权衡,这种模式如果消息在接受到之前,消费者那边出现链接或者channel关闭,那么消息就丢失了,当然如果另一方面这种模式消费者那边可以传递过载消息,没有对传递的消息数量进行限制,这样有可能导致消费者由于接收太多还来不及处理的消息,导致这些消息积压,最终导致内存耗尽,最终这些消费者线程被操作系统杀死,故而这种模式适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。(不靠谱)

手动应答

能用手动则用手动。手动方法

  1. Channel.basicAck(用于肯定确认)
  2. Channel.basicNack(用于否定确认)
  3. Channel.basicReject(用于否定确认),对比nack少了一个参数,不处理该消息了直接就可以拒绝,也可以将他丢弃。

批量应答(multiple):==推荐使用false==,当接受到一个消息的时候,也就是队列往信道里边放了n条数据,那么他会将此信道里边的所有消息都应答一次。

消息自动重新入队

如果消费者由于某些原因失去连接,或者说他的通道已经关闭,连接已经关闭或者TCP连接丢失,导致消息未发送ACK确认,此时RabbitMQ将了解到消息未完全处理,并将其重新排队,如果此时其他消费者可以处理,他将很快将其重新分发给另一个消费者,这样即使某个消费者偶尔死亡,也可以确保不会丢失任何信息。

个人理解:

也就是本来做这个任务的员工有事辞职了,公司为了完成任务交给另一个还在职的员工完成这个任务,直到这个任务完成,这样保证任务不被丢失。

代码实现:

消息在手动应答的时候是不应该被丢失的,且会重新入队。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/*
我们主要围绕的是手动应答进行处理,要是消息应答不丢失,放回队列
重新消费
*/
public class Task {
//老样子,队列名称
public static final String QUEUE_NAME = "ack_queue";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = GetConnection.getChannel();

//声名队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);

//从控制台中获取
Scanner sc = new Scanner(System.in);
while (sc.hasNext()){
String message = sc.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
System.out.println("生产者发送消息: "+ message);
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/*
接收消息
*/
public class Work_Consume1 {

public static final String QUEUE_NAME = "ack_queue";


public static void main(String[] args) throws IOException, TimeoutException {
//接受消息
Channel channel = GetConnection.getChannel();

System.out.println("C1等待接受消息处理时间较短");

DeliverCallback deliverCallback = (consumerTag, message)->{
//看情况你给不给休眠
SleepUtils.sleep(1);

System.out.println("接收到消息" + new String(message.getBody(),"UTF-8"));
//进行手动应答
/*
参数1 , 消息的标记tag
参数2, 是否批量应答 false 不 ,true,批量应答
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
//手动接收消息


//采用手动应答
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,(consumerTag->{
System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
}));



}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/*
* @Title 休眠工具类
*/
public class SleepUtils {
public static void sleep(int second) {
//JUC里边的工具类

try {
Thread.sleep(1000*second);
//TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
//中断
Thread.currentThread().interrupt();
}
}

}

通过上述方式,我们可以知道消息队列在我们创建多个消费者的时候,是轮转来的,当然,假设有两个消费者,如果有一个宕机,那么消息队列就将本轮转给没宕机的那个消费者消费,==只要生产者有产生,无论如何要将消息全部消费==,不可丢弃。

Rabbit消息持久化

消息是可以持久化保存的,持久的目的是为了处理任务丢失情况的,采用持久化可以保证消息存储,且消息不被丢失。

队列如何持久化

两个持久化操作都是在生产者中进行的。

我们需要将durable参数设置为持久化

1
2
3
//让队列持久化
boolean durable = true;
channel.queueDeclare(队列名,durable,false,false,null)

但是需要注意的是,==就是如果之前声明的队列不是持久化的,需要把原先队列先删除,然后重新创建一个持久化队列,不然就会出现错误==。

错误:

com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg ‘durable’ for queue ‘ack_queue’ in vhost ‘/‘: received ‘true’ but current is ‘false’, class-id=50, method-id=10)

注意:

==持久化后的rabbitMQ重启之后队列消息还是会存在的,未持久化的,那么对不起,他没了。==

消息持久化

将消息标记为持久化并不能保证不会丢失消息,尽管他会告诉rabbitMQ将消息保存到磁盘中,但是这里仍然存在当消息刚刚存储池在磁盘的时候,但是还没有存储完,消息还在缓存的一个间隔点,此时并没有真正写入磁盘,持久性保证并不强,但是对于我们简单任务队列而言,这已经就绰绰有余了。

MessageProperties.PERSISTENT_TEXT_PLAIN

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Task {
//老样子,队列名称
public static final String QUEUE_NAME = "ack_queue";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = GetConnection.getChannel();

//声名队列
//让队列持久化
boolean durable = true;
channel.queueDeclare(QUEUE_NAME,durable,false,false,null);


//从控制台中获取
Scanner sc = new Scanner(System.in);
while (sc.hasNext()){
String message = sc.next();

//添加持久化
//设置生成者发送消息为持久化信息(要求保存到硬盘上)保存在内存中
//MessageProperties.PERSISTENT_TEXT_PLAIN,指令完成持久化
channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
//未持久化时候的是
// channel.basicPublish("",QUEUE_NAME,null,message.getBytes("UTF-8"));
System.out.println("生产者发送消息: "+ message);
}
}

}

不公平分发:

在某些情况下轮训分发并不好用,具体例子:有两个消费者在处理任务,其中有个消费者1处理任务的速度非常快,而另一个消费者2处理的速度却很慢,这个时候我们还采用轮训分发就会导致这个处理速度快的很大一部分时间处于空闲状态,而处理慢的那个消费者就一直在干活,这种分配方式就不是很好。

==能者多劳==,多劳多得。

为了避免这种情况,我们有采用不公平分发操作

设置参数channel.basicQos(1);

能者多劳是在消费者中设置的

1
2
3
//能者多劳
int prefetchCount = 1;
channel.basicQos(prefetchCount);

一般我们生活中采用的就是不公平分发。

预取值:

预先分配任务,比如生产者生成7条数据,通过队列分发,通过==预取值,预取值是多少,就给属于那条信道的消费者分配多少消息。和消费者处理消息的快慢无关。==

注意

预取值消耗完毕后,之后的值就按照那个消费者快给他分配的信息就多,谁慢,谁分配的信息就少。

预取值是洗发人员能限制缓冲区大小,避免缓冲区里边无限制的未确定消息问题。

需要注意的是,预取值并不是你直接输入多少条数据他就可以直接堆满的我们设置的预取值的,他可能由于消费费者处理速度影响,比如,你输入10条数据,但是设置预取值为5的最后值只会产生4条数据也说不准,因为另一个太快了,其他的数据都被他消耗完毕了。

1
2
3
4
5
//设置非公平分发
//int i = 1;
//预取值
int i= 5; //设置分发5个消息,也就是预取值为5
channel.basicQos(i);
1
2
3
4
//设置
//int i = 1; //不公平分发
int i = 2;//此时设置就是预取值了
channel.basicQos(i);

发布和确认

生产者 - - 发送消息 — 队列hello :apple:必须保存在磁盘上才能达到持久化操作。

  1. 设置要去队列必须持久化
  2. 设置要求队列中的消息必须持久化
  3. 发布确认

开启发布确认

单个确认发布:

每次生产者生产一个消息他都会确认一次,这样的好处就是,如果发送信息丢失可以很容易确定位置,缺点是慢

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//单个确认
public static void publicMessageIndividually() throws IOException, TimeoutException, InterruptedException {
Channel channel = GetConnection.getChannel();

//队列的声明
String queueName = UUID.randomUUID().toString();

//使用信道对队列进行声明
channel.queueDeclare(queueName,true,false,false,null);
//开启发布确认
channel.confirmSelect();
//开始时间
long begin = System.currentTimeMillis();

//批量发消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",queueName,null,message.getBytes());
//单个消息就马上进行发布确认
boolean flag = channel.waitForConfirms();
if(flag){
System.out.println("消息发送成功");
}
}
//结束时间
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "单个消息" + (end-begin)+"毫秒");
}

批量确认发布:

批量发布对比单个发布速度快了很多,但是这个碰到问题去查找问题的时候就比较慢了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//批量发布
public static void publicMessageBatch() throws IOException, TimeoutException, InterruptedException {
Channel channel = GetConnection.getChannel();

//队列的声明
String queueName = UUID.randomUUID().toString();

//使用信道对队列进行声明
channel.queueDeclare(queueName,true,false,false,null);
//开启发布确认
channel.confirmSelect();
//开始时间
long begin = System.currentTimeMillis();

//批量确认消息的大小
int batchSize = 10;
//未确认消息个数

//批量发消息,批量发布确认
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("",queueName,null,message.getBytes());
//单个消息就马上进行发布确认
boolean flag = channel.waitForConfirms();

//消息达到10确认一次
if(i%batchSize == 0){
channel.waitForConfirms();
}
}
//结束时间
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "发布100个消息" + (end-begin) +"毫秒");
}

异步确认发布

对比单步和批量这个就厉害多了,他可以异常进行,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功。

寄快件的人疯狂发,然后到broker中选择哪些需要确认的进行确认。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
//异步
public static void publishMessageAsync() throws IOException, TimeoutException {
Channel channel = GetConnection.getChannel();

//队列的声明
String queueName = UUID.randomUUID().toString();

//使用信道对队列进行声明
channel.queueDeclare(queueName,true,false,false,null);
//开启发布确认
channel.confirmSelect();
//开始时间
long begin = System.currentTimeMillis();

//消息确认成功,回调函数
ConfirmCallback ackCallback = (delivery,multiple)->{
System.out.println("确认的消息:" + delivery);
};
//消息确认失败,回调函数
/*
1.消息的标记
2.消息是否为批量
*/
ConfirmCallback nackCallback = (delivery,multiple)->{
System.out.println("未确认的消息:" + delivery);
};

//在发消息的时候,你需要准备一个监听器,监听哪些消息成功了,哪些消息失效了
/*
1.监听哪些消息成功了
2.监听哪些消息失败了
*/
channel.addConfirmListener(ackCallback,nackCallback);

//批量发送消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "消息" + i;
channel.basicPublish("",queueName,null,message.getBytes());
//发布确认,这个时候要是进行发布确认就是同步了
}



//结束时间
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "异步发布确认消息,耗时" + (end-begin) +"毫秒");

}

这就出现一个问题,如何处理异步未确认消息

最好的解决的解决方案,就是把未确认的消息放到一个基于内存的能发布线程访问的队列,比如说ConcurrentLinkedQueue这个队列在confirm callbacks与发布线程之间进行消息的传递。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
//异步
public static void publishMessageAsync() throws IOException, TimeoutException {
Channel channel = GetConnection.getChannel();

//队列的声明
String queueName = UUID.randomUUID().toString();

//使用信道对队列进行声明
channel.queueDeclare(queueName,true,false,false,null);
//开启发布确认
channel.confirmSelect();

/*
线程安全有序的一个哈希表,适用于高并发的情况下的
1.轻松将序号和消息进行关联
2.轻松批量删除条目,只要给到序号
3.支持高并发(多线程)
*/
ConcurrentSkipListMap<Long,String> outstandingConfirms=
new ConcurrentSkipListMap<>();

//消息确认成功,回调函数
ConfirmCallback ackCallback = (delivery,multiple)->{
if(multiple){
//删除已经确认的消息,剩下的就是为确认的消息
ConcurrentNavigableMap<Long, String> confirmed =
outstandingConfirms.headMap(delivery);
confirmed.clear();
}else{
outstandingConfirms.remove(delivery);
}
System.out.println("确认的消息:" + delivery);
};
//消息确认失败,回调函数
/*
1.消息的标记
2.消息是否为批量
*/
ConfirmCallback nackCallback = (delivery,multiple)->{
//3.打印一下未确认的消息都有哪些
String message = outstandingConfirms.get(delivery);
System.out.println("未确认的消息是:" + message+"未确认的标记:" + delivery);
};

//在发消息的时候,你需要准备一个监听器,监听哪些消息成功了,哪些消息失效了
/*
1.监听哪些消息成功了
2.监听哪些消息失败了
*/
channel.addConfirmListener(ackCallback,nackCallback); // 异步通知

//开始时间
long begin = System.currentTimeMillis();


//批量发送消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "消息" + i;
channel.basicPublish("",queueName,null,message.getBytes());
//发布确认,这个时候要是进行发布确认就是同步了
/*
1。此处记录下所有要发送的消息,消息的总和
*/
outstandingConfirms.put(channel.getNextPublishSeqNo(),message);
System.out.println();
}
//结束时间
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "异步发布确认消息,耗时" + (end-begin) +"毫秒");

}

以上三种发布的确认速度对比

单步发布消息

  • 同步等待确认,简单,但是吞吐量非常有限

批量发布确认

  • 批量同步等待确认,加单,合理的吞吐量,一旦出现问题,但很难推断出是那条消息出现了问题。

异步发布确认

最佳性能和资源使用,在出现错误的情况下可以很好的控制,但是实现起来稍微难一点。

交换机(exchanges)

当使用到交换机的时候,我们用的就不是普通的模式了,而是发布订阅模式了。

生产者生成的消息不会直接发送到队列

而是直接将消息先发送到交换机,并且只能发送到交换机,之钱的我们可以直接发送到队列(事实上我们走的是默认交换机),然后队列交给消费者,现在不行了,改成先发送给交换机,交换机在发给队列。

交换机的工作方式很简单,他接收来自生产者的消息,另一方面将他们推送到队列。

交换机的类型

直接(direct),主题(topic),标题(heads),扇出(fanout)

无名exchanges

事实上就是默认类型,我们通过空字符(””)串进行标识。

channel.basicPublic("","hello",null,message.getBytes());

实际上第一个参数就是交换机的名字,空字符串表示默认或者无名称交换机,消息能路由发送到队列中,其实是由routing(bindingkey)绑定key指定的,如果它存在的话。

临时队列

实际上是自定义的,一旦断开链接,这个队列就会被删除。

绑定

实际上就是交换机和队列的桥梁,他告诉我们交换机和那个队列进行绑定关系

Fanout

fanout是一种交换机的类型,这种类型非常简单,正如名称中猜想的那样,他是将接收的所有消息广播到他知道的所有队列中,系统中默认有exchanges交换机类型。

1667315273358

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/*
发消息 交换机
*/
public class EmitLog {
//交换机的名称
public static final String EXCHANGE_NAME="logs";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = GetConnection.getChannel();

channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
Scanner sc = new Scanner(System.in);

while (sc.hasNext()){
String message = sc.next();
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));
System.out.println("生产者发送消息:" + message);

}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/*
消息接收,客户端
*/
public class ReceiveLog {
//交换机的名称
public static final String EXCHANG_NAME="logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = GetConnection.getChannel();
//声明一个交换机
channel.exchangeDeclare(EXCHANG_NAME,"fanout");
//声明一个队列,来一个临时队列
/*
队列的名称是随机的。
当消费者断开与队列的连接的时候,队列就可以自动删除
*/
String queueName = channel.queueDeclare().getQueue();
/*
绑定交换机和队列
*/
channel.queueBind(queueName,EXCHANG_NAME,"");
System.out.println("等待接受消息,把消息显示在屏幕上");
//接受消息
DeliverCallback deliverCallback = (consumer,message)->{
System.out.println("Receive控制台打印接受到的消息" + new String(message.getBody(),"UTF-8") );
};
//消费者取消消息时回调接口
channel.basicConsume(queueName,true,deliverCallback,consumer->{

});
}
}

Direct

从图上我们可以看到,X绑定了两个队列,绑定类型是direct,队列Q1绑定建为orange,队列2绑定键有两个,一个为black,另一个为green.

在这种情况下,发布这发布消息到exchange上,绑定键为orange的消息会被发布到队列Q1绑定键为blackgreen的消息会被发布到队列Q2,其他消息类型就被丢弃。

多重绑定

当然如果exchange的绑定类型是direct,但是他绑定的多个队列的key如果多相同在这种情况下虽然绑定类型是direct,但是他表现的就和fanout有点想类似了,就和广播差不多了。

这边就可以指定发给谁了,我不发给谁他就收不到消息,只有我允许他接收他才能得到接收。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class DirectLogs {

//交换机的名称
public static final String EXCHANGE_NAME="direct_logs";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = GetConnection.getChannel();
Scanner sc = new Scanner(System.in);

while (sc.hasNext()){
String message = sc.next();
channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes("UTF-8"));
System.out.println("生产者发送消息:" + message);
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ReceiveLogDirect01 {
public static final String EXCHANGE_NAME ="direct_logs";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = GetConnection.getChannel();

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明一个交换机
channel.queueDeclare("console",false,false,false,null);
/*
绑定交换机和队列
*/
channel.queueBind("console",EXCHANGE_NAME,"info");
channel.queueBind("console",EXCHANGE_NAME,"warning");
//接受消息
DeliverCallback deliverCallback = (consumer, message)->{
System.out.println("ReceiveLogsDirect01控制台打印接受到的消息" + new String(message.getBody(),"UTF-8") );
};

//消费者取消消息时回调接口
channel.basicConsume("console",true,deliverCallback,consumer->{});

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ReceiveLogDirect02 {
public static final String EXCHANGE_NAME ="direct_logs";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = GetConnection.getChannel();

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明一个交换机
channel.queueDeclare("disk",false,false,false,null);
/*
绑定交换机和队列
*/
channel.queueBind("disk",EXCHANGE_NAME,"error");
//接受消息
DeliverCallback deliverCallback = (consumer, message)->{
System.out.println("ReceiveLogsDirect01控制台打印接受到的消息" + new String(message.getBody(),"UTF-8") );
};

//消费者取消消息时回调接口
channel.basicConsume("disk",true,deliverCallback,consumer->{});
}
}

Topic

对比上面两种交换机更加完美,当存在我们要接受日志类型有info.base和info.advantage,某个队列只想info.base的消息,那这个·时候上边两种交换机就做不到了。此时我们采用的就是topic交换机类型了。

Topic

发送类型是topic交换机的消息的routing_key不能随意写,必须满足一定的要求,他必须是一个单纯列表,以点号隔开,这个单次可以是任意单词。”nysc.xxx”等。限制要求单词列表不能超过255个字节。

需要注意的是

* 代表的是一个单词

‘#’ 代表的是可以替换零个或多个单词。

下图绑定关系如下

Q1—> 绑定的是

中间代orange带3个单词的字符串( .orange.

Q2—> 绑定的是

最后一个单词是rabbit的3个单词( . rabbit)

第一个单词是lazy的多个单词(lazy.#)

下边我们将上图之间的数据接受情况列举出来

quick.orange.rabbit 被队列Q1Q2接受到

lazy.orange.elephant 被队列Q1Q2接受到

quick.orange.fox 被队列Q1接受到

lazy.brown.fox 被队列Q2接受到

lazy.pink.rabbit 虽然满足两个绑定但只被队列Q2接受一次

quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃

quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃

lazy.orange.male.rabbit 是四个单词但匹配Q2

==上述加粗的是符合TOPIC类型交换机的单词指令==

==是最强大的,也是使用最广的==

  • 当队列绑定关系是#,那么这个队列将接收所有的数据,有点像Fanout了。
  • 如果队列绑定的键中没有#和*出现,那么该队列绑定类类型就是direct了。

消费者1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/*
声明主题交换机
*/
public class ReceiveLogsTopic01 {

//老样子定义一个交换机名
public static final String EXCHANGE_NAME= "topic_logs";

//接受消息
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = GetConnection.getChannel();
//声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//声明队列
String queueName = "Q1";
channel.queueDeclare(queueName,false,false,false,null);

//绑定
channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");
System.out.println("等待接受消息");

DeliverCallback deliverCallback = (consumer,message)->{
System.out.println(new String(message.getBody(),"UTF-8"));
System.out.println("接受队列:——> " +queueName +"绑定键:"+ message.getEnvelope().getRoutingKey());
};
//接收消息

channel.basicConsume(queueName,true,deliverCallback,consumer->{});
}

}

等待接受消息
被队列Q1Q2接受到
接受队列:——> Q1绑定键:lazy.orange.elephant
被队列Q1Q2接受到
接受队列:——> Q1绑定键:quick.orange.rabbit
被队列Q1接受到
接受队列:——> Q1绑定键:quick.orange.fox

消费者2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/*
声明主题交换机
*/
public class ReceiveLogsTopic02 {

//老样子定义一个交换机名
public static final String EXCHANGE_NAME= "topic_logs";

//接受消息
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = GetConnection.getChannel();
//声明一个交换机
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//声明队列
String queueName = "Q2";
channel.queueDeclare(queueName,false,false,false,null);

//绑定
channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");
System.out.println("等待接受消息");

DeliverCallback deliverCallback = (consumer,message)->{
System.out.println(new String(message.getBody(),"UTF-8"));
System.out.println("接受队列:——> " +queueName +"绑定键:"+ message.getEnvelope().getRoutingKey());
};
//接收消息

channel.basicConsume(queueName,true,deliverCallback,consumer->{});
}
}

等待接受消息
被队列Q1Q2接受到
接受队列:——> Q2绑定键:lazy.orange.elephant
被队列Q2接受到
接受队列:——> Q2绑定键:lazy.brown.fox
被队列Q1Q2接受到
接受队列:——> Q2绑定键:quick.orange.rabbit
虽然满足两个绑定但只被队列Q2接受一次
接受队列:——> Q2绑定键:lazy.pink.rabbit
是四个单词但匹配Q2
接受队列:——> Q2绑定键:lazy.orange.male.rabbit

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/*
生产者
*/
public class EmitLogTopic {
//老样子,还是定义一个交换机名字,注意这个交换
// 机名字需要和消费者里边定义的交换机一样,避免出错

private static final String EXCHANGE_NAME="topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
//获取信道链接
Channel channel = GetConnection.getChannel();
/*
Q1--> 绑定的是
中间代orange带3个单词的字符串(* .orange. *)
Q2--> 绑定的是
最后一个单词是rabbit的3个单词(* . *rabbit)
第一个单词是lazy的多个单词(lazy.#)
quick.orange.rabbit 被队列Q1Q2接受到
lazy.orange.elephant 被队列Q1Q2接受到
quick.orange.fox 被队列Q1接受到
lazy.brown.fox 被队列Q2接受到
lazy.pink.rabbit 虽然满足两个绑定但只被队列Q2接受一次
quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃
quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃
lazy.orange.male.rabbit 是四个单词但匹配Q2


*/
Map<String,String> map = new HashMap<>();
map.put("quick.orange.rabbit","被队列Q1Q2接受到");
map.put("lazy.orange.elephant","被队列Q1Q2接受到");
map.put("quick.orange.fox","被队列Q1接受到");
map.put("lazy.brown.fox","被队列Q2接受到");
map.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列Q2接受一次");
map.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
map.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
map.put("lazy.orange.male.rabbit","是四个单词但匹配Q2");

for (Map.Entry<String, String> mapEntry : map.entrySet()) {
String routingKey = mapEntry.getKey();
String message = mapEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));
System.out.println("生产者发送消息: " + message);
}
}
}

生产者发送消息: 是四个单词不匹配任何绑定会被丢弃

生产者发送消息: 不匹配任何绑定不会被任何队列接收到会被丢弃

生产者发送消息: 被队列Q1Q2接受到

生产者发送消息: 被队列Q2接受到

生产者发送消息: 被队列Q1Q2接受到

生产者发送消息: 被队列Q1接受到

生产者发送消息: 虽然满足两个绑定但只被队列Q2接受一次

生产者发送消息: 是四个单词但匹配Q2

死信队列

死信:顾名思义就是无法被消费的消息,一般情况下,product将消息投递到broker或者直接到queue里,consumer从queue取出消息,进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成死信,有死信自然就有死信队列。

一般应用场景:为了保证订单业务中的消息数据不丢失,需要使用rabbitMQ的死信队列机制,当消息消费发生异常的时候,将消息投入死信队列中,还有比如说:用户在商场下单成功并点击去支付后在指定时间未支付时自动失效。

死信的来源

  • 消息TTL过期
  • 队列达到最大长度(队列满了,无法再添加数据到mq中)
  • 消息被拒绝(basic.reject或basic.nack)并且requeue= false;

死信实战

消息TTL过期

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/*
死信队列
之生产者代码
*/
public class Product {
//普通交换机的名称
public static final String NORMAL_EXCHANGE = "normal_exchange";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = GetConnection.getChannel();
//死信消息,设置TTL时间
AMQP.BasicProperties properties =
new AMQP.BasicProperties()
.builder().expiration("10000").build();

for (int i = 0; i < 11; i++) {
String message = "info "+ i;
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
}
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/*
死信
消费者
*/
public class Consumer01 {

//普通交换机的名称
private static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交换机的名称
private static final String DEAD_EXCHANGE = "dead_exchange";
//普通队列名称
private static final String NORMAL_QUEUE = "normal_queue";
//死信队列名称
private static final String DEAD_QUEUE = "dead_queue";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = GetConnection.getChannel();

//声明死信和普通交换机类型 direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);

// 普通队列
Map<String,Object> argument = new HashMap<>();
//过期时间10秒过期 可以不设置,可以交给队列自动去
// argument.put("x-message-ttl",100000);
//正常队列设置死信交换机
argument.put("x-dead-lettle-exchange",DEAD_EXCHANGE);
//设置死信RoutingKey
argument.put("x-dead-lettle-routing-key","lisi");

channel.queueDeclare(NORMAL_QUEUE,false,false,false,argument);

///////////////////////////////////
//声名死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

//绑定普通交换机和普通队列
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
//绑定死信的交换机与死信队列
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
System.out.println("等待接受......");

//绑定死信的交换机和死信的对列
DeliverCallback deliverCallback = (consumer,message)->{
System.out.println("Consumer 01接受到消息是:" + new String(message.getBody(),"UTF-8"));
};

channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,consumer->{});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Consumer02 {

//死信队列名称
private static final String DEAD_QUEUE = "dead_queue";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = GetConnection.getChannel();

System.out.println("等待接受......");

//绑定死信的交换机和死信的对列
DeliverCallback deliverCallback = (consumer,message)->{
System.out.println("Consumer 02接受到消息是:" + new String(message.getBody(),"UTF-8"));
};

channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumer->{});
}
}

==解决进不去死信队列的原因了,就是我没给他把过期时间值传进去,导致出现问题。==

队列达到最大长度

消息生成者代码去掉TTL属性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class Consumer01 {

//普通交换机的名称
private static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交换机的名称
private static final String DEAD_EXCHANGE = "dead_exchange";
//普通队列名称
private static final String NORMAL_QUEUE = "normal_queue";
//死信队列名称
private static final String DEAD_QUEUE = "dead_queue";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = GetConnection.getChannel();

//声明死信和普通交换机类型 direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);

// 普通队列
Map<String,Object> argument = new HashMap<>();
//过期时间10秒过期 可以不设置,可以交给队列自动去
// argument.put("x-message-ttl",10000);
//正常队列设置死信交换机
argument.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//设置死信RoutingKey
argument.put("x-dead-letter-routing-key","lisi");

//设置正常队列的长度的限制
argument.put("x-max-length",6);


channel.queueDeclare(NORMAL_QUEUE,false,false,false,argument);

///////////////////////////////////
//声名死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

//绑定普通交换机和普通队列
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
//绑定死信的交换机与死信队列
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
System.out.println("等待接受......");

//绑定死信的交换机和死信的对列
DeliverCallback deliverCallback = (consumer,message)->{
System.out.println("Consumer 01接受到消息是:" + new String(message.getBody(),"UTF-8"));
};

channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,consumer->{});
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Product {
//普通交换机的名称
public static final String NORMAL_EXCHANGE = "normal_exchange";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = GetConnection.getChannel();
//死信消息,设置TTL时间 单位是ms
// AMQP.BasicProperties properties =
// new AMQP.BasicProperties()
//.builder().expiration("10000").build();

for (int i = 0; i < 11; i++) {
String message = "info "+ i;
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
System.out.println("生产者发送消息:"+message);
}
}
}

消息被拒绝

一旦消费者拒绝接受,就会成为死信队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Product {
//普通交换机的名称
public static final String NORMAL_EXCHANGE = "normal_exchange";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = GetConnection.getChannel();
//死信消息,设置TTL时间 单位是ms
//演示长度 AMQP.BasicProperties properties =
// new AMQP.BasicProperties()
// .builder().expiration("10000").build();

for (int i = 0; i < 11; i++) {
String message = "info "+ i; //properties
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes());
System.out.println("生产者发送消息:"+message);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
public class Consumer01 {

//普通交换机的名称
private static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交换机的名称
private static final String DEAD_EXCHANGE = "dead_exchange";
//普通队列名称
private static final String NORMAL_QUEUE = "normal_queue";
//死信队列名称
private static final String DEAD_QUEUE = "dead_queue";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = GetConnection.getChannel();

//声明死信和普通交换机类型 direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);

// 普通队列
Map<String,Object> argument = new HashMap<>();
//过期时间10秒过期 可以不设置,可以交给队列自动去
// argument.put("x-message-ttl",10000);
//正常队列设置死信交换机
argument.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//设置死信RoutingKey
argument.put("x-dead-letter-routing-key","lisi");

//设置正常队列的长度的限制
//演示拒绝 argument.put("x-max-length",6);


channel.queueDeclare(NORMAL_QUEUE,false,false,false,argument);

///////////////////////////////////
//声名死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

//绑定普通交换机和普通队列
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
//绑定死信的交换机与死信队列
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
System.out.println("等待接受......");

//绑定死信的交换机和死信的对列
DeliverCallback deliverCallback = (consumer,message)->{
//拒绝消息在这里写
String message1 = new String(message.getBody(),"UTF-8");
//指定你要拒绝的消息
if(message1.equals("info 5")){
System.out.println("此消息是被拒绝的"+ message1);
//拒接接收,且不放回队列,避免队列重新发送
channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
}else {

System.out.println("Consumer 01接受到消息是:" + new String(message.getBody(),"UTF-8"));
// 接收且不放回
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
}

};
//开启手动应答, //true是自动应答。
channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,consumer->{});
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Consumer02 {

private static final String DEAD_EXCHANGE = "dead_exchange";

//死信队列名称
private static final String DEAD_QUEUE = "dead_queue";

public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = GetConnection.getChannel();
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
System.out.println("等待接受......");

//绑定死信的交换机和死信的对列
DeliverCallback deliverCallback = (consumer,message)->{
System.out.println("Consumer 02接受到消息是:" + new String(message.getBody(),"UTF-8"));
};
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumer->{});
}
}

延迟队列

延迟队列,对列内部是有序的,最重要的特性就是体现在他的延时属性上,延时队列中的元素时希望在指定时间到了之后或者之前取出和处理,简单的来说,延迟队列就是用来存放需要在指定时间被处理的元素的队列。

延迟队列的使用场景

  1. 订单在十分钟内未支付则自动取消
  2. 新创建的店铺,如果在十天内没有上传商品,则自动发送消息提醒
  3. 用户注册成功后,如果没有在三天内登录则发送短信提醒消息
  4. 用户发起退款,如果在三天内没有得到处理则通知相关运营人员。
  5. 预定会员后,需要在预定的时间点前十分钟通知各个与会员人员参加会议。

延迟队列整合SpringBoot

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.xh</groupId>
<artifactId>SpringBoot_RabbitMQ</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>SpringBoot_RabbitMQ</name>
<description>SpringBoot_RabbitMQ</description>

<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.3.7.RELEASE</spring-boot.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

<!--rabbitMQ依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>

<!--swagger-->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.9.2</version>
</dependency>

<!--RabbitMQ测试依赖 -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<dependencyManagement>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>2.3.7.RELEASE</version>
<configuration>
<mainClass>com.xh.springBoot.SpringBootRabbitMqApplication</mainClass>
</configuration>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Bean
public Docket webApiConfig(){
return new Docket(DocumentationType.SWAGGER_2)
.groupName("webApi")
.apiInfo(webApiInfo())
.select()
.build();
}
private ApiInfo webApiInfo(){
return new ApiInfoBuilder()
.title("rabbitMQ接口文档")
.description("描述Rabbit微服务接口定义")
.version("1.0")
.contact(new Contact("我喜欢","127.0.0.1","2844****670@qq.com"))
.build();
}
}

队列TTL

创建两个队列QA和QB,两者队列TTL分被是10s,40s,然后在创建一个交换机X和死信交换机Y,他们的类型都是direct,创建一个死信队列QD,他们的绑定关系如下

代码文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
/*
TTL 队列 配置文件类代码
*/
@Configuration
public class TtlQueueConfig {
//普通的交换机名称
public static final String X_EXCHANGE = "X";
//死信交换机的名称
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
//普通队列的名称
public static final String A_QUEUE = "QA";
public static final String B_QUEUE = "QB";
//死信队列的名称
public static final String Y_DEAD_LETTER_QUEUE = "QD";

//声明XCHANGE
@Bean("xExchange") //起别名
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
//声明yExchange 别名
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}

//声明队列
@Bean("queueA")
public Queue queueA(){
Map<String,Object> map =new HashMap<>();
//设置死信交换机
map.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
//设置死信RoutingKey
map.put("x-dead-letter-routing-key","YD");
//设置TTL单位是ms 什么时候消息成为死信,10秒钟后
map.put("x-message-ttl",10000);

return QueueBuilder.durable(A_QUEUE).withArguments(map).build();
}

//声明队列
@Bean("queueB")
public Queue queueB(){
Map<String,Object> map =new HashMap<>();
//设置死信交换机
map.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
//设置死信RoutingKey
map.put("x-dead-letter-routing-key","YD");
//设置TTL单位是ms 什么时候消息成为死信,40秒钟后
map.put("x-message-ttl",40000);

return QueueBuilder.durable(B_QUEUE).withArguments(map).build();
}

//死信队列
@Bean("queueD")
public Queue QueueD(){
return QueueBuilder.durable(Y_DEAD_LETTER_QUEUE).build();
}

//绑定
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with("XA");

}
//绑定
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueB).to(xExchange).with("XB");

}
//绑定
@Bean
public Binding queueDBindingX(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange){
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/*
发送延迟消息
http://地址/ttl/sendMessage/子非吾喵
*/
@Slf4j //打印日志
@RestController
@RequestMapping("/ttl")
public class SellMessageController {

@Autowired
private RabbitTemplate rabbitTemplate;

//开始发送消息
@GetMapping("/sendMessage/{message}")
//注意@PathVariable 可以自定义传值到url中,前提示变量名必须和url中的一样
public void sendMessage(@PathVariable String message){
//后边的语句参数会替换{},这由程序员控制
log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date().toString(),message);
rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10秒的消息队列:" + message);
rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40秒的消息队列:" + message);
}
}

http://127.0.0.1:8080/ttl/sendMessage/hello1

http://127.0.0.1:8080/ttl/sendMessage/hello2

上述存在一个不足之处,就是我们每次使用的时候,都需要增加一个新的时间需求,就需要新增一个队列,这里只是用了10s和40s两个时间选项,但万一我要一个小时呢,难不成还又要添加,这样太麻烦了。

延迟队列优化

使用QC来设置时间,用户自定义时间。

具体代码如下

TtlQueueConfig.java中添加

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static final String C_QUEUE = "QC";

//声明QC
@Bean("queueC")
public Queue queueC(){
Map<String,Object> map = new HashMap<>(3);
//设置死信交换机
map.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
//设置死信RoutingKey
map.put("x-dead-letter-routing-key","YD");
//TTL设置时长 ms (这里不写,又用户自己定义)
return QueueBuilder.durable(C_QUEUE).withArguments(map).build();

}
//绑定
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}

​ SellMessageController.java

1
2
3
4
5
6
7
8
9
10
11
//开始发送消息 消息 TTL
@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){
log.info("当前时间:{},发送一条时长{}毫秒TTL信息给队列QC:{}",
new Date().toString(),ttlTime,message);
rabbitTemplate.convertAndSend("X","XC",message,msg -> {
//发送消息的时候,延迟时长
msg.getMessageProperties().setExpiration(ttlTime);
return msg;
} );
}

==注意我在写这个代码的时候,刚刚开始是敲错了一个字母,导致运行时生成的队列,然后调错的时候,需要生成的队列删除,然后重新创建,可以解决问题。。==

>http://127.0.0.1:8080/ttl/sendExpirationMsg/hello1/2000
>
>http://127.0.0.1:8080/ttl/sendExpirationMsg/hello2/20000

死信在做延迟的一个巨大缺陷,消息不会按时“死亡”,由于RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延迟时长很短,则第二个消息并不会优先得到执行。

这里出现一个问题,当出现两条消息的时候,我们发现,他居然是按照谁先来的,哪怕你时间短,但是你比另一个队列慢一步,你就只能老老实实排队了。1667661985681

RabbitMQ插件实现延迟队列

这个就是解决上边的问题的,我们使用插件解决。

在官网上下载: https://www/rabbitmq.com/community-plugins.html,下载

rabbitmq_delayed_message_exchange-3.8.0

拷贝到

/usr/lib/rabbitmq/lib/rabbitmq_server-版本号/plugs

安装指令

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

然后在linux系统下解压

然后安装rabbitmq-plugins enable rabbitmq_delayed_message_exchange

之后重启

systemctl restart rabbitmq-server(这个是你安装时候的名字)

这个时候不在是队列,而是交换机了。

没插件的时候

基于插件的

==注意,由于我是docker安装的,我没找到安装这个插件的位置,故而,这一点,我只能敲一下代码了解一下==

解决办法:

https://blog.csdn.net/DZP_dream/article/details/118391439

docker search rabbitmq

安装拉取容器:

docker run -dit --name rabbitmq -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest -p 15672:15672 -p 5672:5672 rabbitmq:management

https://www.rabbitmq.com/community-plugins.html

docker cp rabbitmq_delayed_message_exchange-3.8.0.ez rabbitmq:/plugins

1
2
[root@local rabbitmq]# docker exec -it rabbitmq /bin/bash                                     
root@3bb56f68570b:/# rabbitmq-plugins list

启动插件

root@3bb56f68570b:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange

DelayedConfig

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/*
延迟队列插件,可惜,我没有。
*/
@Configuration //实例化
public class DelayedQueueConfig {

//队列 //
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
//交换机
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
//routingKey
public static final String DELAYED_ROUTING_KEY = "delayed.routing";

@Bean
public Queue delayedQueue(){
return new Queue(DELAYED_QUEUE_NAME);
}

//声明交换机 基于插件的交换机就是这么定义的
@Bean
//自定义交换机 由于是不存在rabbit里边的交换机,也就是我们使用的延迟队列插件
public CustomExchange delayedExchange(){
Map<String,Object> map = new HashMap<>();
map.put("x-delayed-type","direct");
/*
1.交换机名称
2.交换机类型
3.是否需要持久化
4.是否需要自动删除
5.其他参数
*/
return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",
true,false,map);
}

//绑定
@Bean
public Binding delayedQueueBingdingDelayedExchange(
@Qualifier("delayedQueue") Queue delayedQueue,
@Qualifier("delayedExchange") CustomExchange delayedExchange
){
//绑定 将队列和交换机直接进行绑定
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}

生产者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/*
* @Title 发送消息 基于插件的 消息以及 延迟的时间
* @Description 延迟队列的插件
* @author 罗小黑
* @date 2022/11/6 11:40
*/
@GetMapping("/sendDelayMsg/{message}/{delayTime}")
public void sendDelayedMsg(@PathVariable String message,@PathVariable Integer delayedTime){
log.info("当前时间:{},发送一条时长{}毫秒信息给延迟队列delayedTime:{}",
new Date().toString(),delayedTime,message);
rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,
DelayedQueueConfig.DELAYED_ROUTING_KEY,message, msg -> {
//发送消息的时候,延迟时长 单位ms
msg.getMessageProperties().setDelay(delayedTime);
return msg;
} );
}

消费者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/*
延迟队列消费者
*/
@Component
@Slf4j
public class DelayedQueueConsumer {
/*
基于插件的延迟队列
*/

//监听消息
@RabbitListener(queues= DelayedQueueConfig.DELAYED_QUEUE_NAME)
public void receiveDelayQueue(Message message){
String msg = new String(message.getBody());
log.info("当前时间:{},收到延迟队列的消息:{}",new Date().toString(),msg);
}
}

http://127.0.0.1:8080/ttl/sendDelayMsg/come on bady1/20000

http://127.0.0.1:8080/ttl/sendDelayMsg/come on bady1/2000

第二个消息被先消费掉了,符合预期。

总结:

延迟队列在需要延时的处理的场景下非常有用,使用rabbitmq来实现延迟队列可以很好的利用rabbitmq的特性,如消息的可靠发送,消息可靠投递,死信队列来保证消息至少被消费一次已经未被正常处理的消息不会被丢弃,另外,通过rabbitMQ集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延迟队列不可用或者信息丢失

当然,延迟队列还有很多其他选择,比如利用java的delayQueue,利用Redis的zset,利用Quartz或者利用kafka的时间轮,这些方式各有各的特点,看需要的适用的场景。

高级部分:

发布确认高级

在生产环境下中由于一些不明的原因,导致rabbitmq重启,在rabbitmq 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复,这个时候,我们如何保证rabbitmq的消息的可靠投递呢?

高级就是,当生产者发送消息给交换机,但是交换机收不到的时候,我们将消息放到缓存中,当交换机可以收到消息的时候,我们就把消息从缓存中移除。一般会出现的问题是,交换机收不到消息,二是队列收不到消息。

代码架构部分

配置文件

在spring.rabbitmq.publisher-confirm-type=correlated

confirm

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

/*
发布确认配置类
目的是为了 发布确认 (高级)
*/
public class ConfirmConfig {

//交换机
public static final String CONFIRM_EXCHANGE_NAME= "confirm_exchange";
//队列
public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
//RoutingKey
public static final String CONFIRM_ROUTING_KEY = "key1";

//声明一个交换机
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
//声明一个队列
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}

//绑定
@Bean
public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,
@Qualifier("confirmExchange") DirectExchange confirmExchange){
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
/*
Product接收消息

*/
@Slf4j
@Component
public class Consumer {
@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
public void receiveConfirmMessage(Message message){
String msg = new String(message.getBody());
log.info("接受到队列confirm.queue的消息: {} " ,msg);
}
}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@RestController
@RequestMapping("/confirm")
@Slf4j
public class ProducerController {

@Autowired
private RabbitTemplate rabbitTemplate ;

//发送消息

@GetMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable String message){
rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY,message);
}
}

回调接口

一旦消息没有被接受我们采取回调接口去确定。

NONE

禁用发布确认模式,是默认值

CORRELATED

发布消息成功到交换器后会触发回调方法

SIMPLE

经过测试有两种效果,其一效果和CORRELATED值一样会触发回调方法。

其二在发布消息成功后采用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法,等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法,如果返回fasle则会关闭channel,则接下来无法发送消息到broker.

1
2
3
4
5
6
7
# 应用名称
spring.application.name=SpringBoot_RabbitMQ
spring.rabbitmq.host=114.132.77.86
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

定义一个回调方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/*
定义一个回调接口
*/
@Component //1
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback{

//由于是内部接口,故而不能直接我们实现的接口,我们需要直接注入
//注入
@Autowired //2
private RabbitTemplate rabbitTemplate;
@PostConstruct //注入 3 一步一步来,这个注解是在其他注解执行完毕之后才会执行,切记
public void init(){
//注入
rabbitTemplate.setConfirmCallback(this);
}

/*
交换机确认回调方法
1.发消息 交换机接收到了消息 回馈
1.1correlationData保存回调消息ID以及相关信息
1.2交换机接收到消息 ack = true
1.3 cause null
2.发送消息 交换机接收失败了问题
2.1 correlationData 保存回调信息的ID以及相关信息
2.2 交换机接收到消息 ack = false
2.3 cause 失败的原因
*/

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData.getId()!=null ? correlationData.getId() : "" ;
if(ack){
log.info("交换机已经收到的ID为: {}的消息",id);
}else{
log.info("交换机还未收到ID为:{}的消息,由于原因:{}",id,cause);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/*
//开始发消息,测试确认

*/
@RestController
@RequestMapping("/confirm")
@Slf4j
public class ProducerController {

@Autowired
private RabbitTemplate rabbitTemplate ;

//发送消息

@GetMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable String message){
CorrelationData correlationData1 = new CorrelationData("1");

rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData1);
log.info("发送消息内容:{}",message + "key1");

CorrelationData correlationData2 = new CorrelationData("2");

rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData2);
log.info("发送消息内容:{}",message + "key2");

}
}

回退消息

在开启了生成者确认机制情况下,交换机接收到消息后,会直接将消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的,那么如何让无法被路由的消息帮我想办法处理一下?最简单的就是通知我一下,我好方便处理,通过设置mandatory参数就可以在当消息传递过程中不可达目的时将消息返回给生成者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Component //1
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{

//由于是内部接口,故而不能直接我们实现的接口,我们需要直接注入
//注入
@Autowired //2
private RabbitTemplate rabbitTemplate;
@PostConstruct //注入 3 一步一步来,这个注解是在其他注解执行完毕之后才会执行,切记
public void init(){
//注入
rabbitTemplate.setConfirmCallback(this);
//不注入的化没有结果
rabbitTemplate.setReturnCallback(this);
}

/*
交换机确认回调方法
1.发消息 交换机接收到了消息 回馈
1.1correlationData保存回调消息ID以及相关信息
1.2交换机接收到消息 ack = true
1.3 cause null
2.发送消息 交换机接收失败了问题
2.1 correlationData 保存回调信息的ID以及相关信息
2.2 交换机接收到消息 ack = false
2.3 cause 失败的原因
*/

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData.getId()!=null ? correlationData.getId() : "" ;
if(ack){
log.info("交换机已经收到的ID为: {}的消息",id);
}else{
log.info("交换机还未收到ID为:{}的消息,由于原因:{}",id,cause);
}
}

//回退接口实现,可以在当消息在传递过程中不可达
// 目的时将消息返回给生产者
//只有当消息不可达目的地时候才进行回退
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("消息:{},被交换机{} 退回了,退回原因是:{},路由Key:{}",
new String(message.getBody()),exchange,replyText,routingKey);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@RestController
@RequestMapping("/confirm")
@Slf4j
public class ProducerController {

@Autowired
private RabbitTemplate rabbitTemplate ;

//发送消息

@GetMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable String message){
CorrelationData correlationData1 = new CorrelationData("1");

rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData1);
log.info("发送消息内容:{}",message + "key1");

CorrelationData correlationData2 = new CorrelationData("2");

rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY + "2" ,message + "key2",correlationData2);
log.info("发送消息内容:{}",message + "key2");

}
}

回退处理

http://127.0.0.1:8080/confirm/sendMessage/你好

备份交换机

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/*
发布确认配置类
目的是为了 发布确认 (高级)
*/

@Configuration
public class ConfirmConfig {

//交换机
public static final String CONFIRM_EXCHANGE_NAME= "confirm_exchange";
//队列
public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
//RoutingKey
public static final String CONFIRM_ROUTING_KEY = "key1";

//备份交换机
public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";

//备份队列
public static final String BACKUP_QUEUE_NAME = "backup_queue";
//报警队列
public static final String WARNING_QUEUE_NAME = "warning_queue";


//声明一个交换机
@Bean("confirmExchange")
public DirectExchange confirmExchange(){ //持久化
return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true).withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME).build();
// return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
//声明一个队列
@Bean("confirmQueue")
public Queue confirmQueue()
{
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}

//绑定
@Bean
public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue confirmQueue,
@Qualifier("confirmExchange") DirectExchange confirmExchange){
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
}
//创建一个备份交换机
@Bean("backupExchange")
public FanoutExchange backupExchange(){
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}
@Bean("backupQueue")
public Queue backupQueue(){
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}

@Bean("warningQueue")
public Queue warningQueue(){
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
}
//备份交换机邦定到备份队列

@Bean
public Binding queueBindingBackupExchange(@Qualifier("backupQueue") Queue backupQueue,
@Qualifier("backupExchange") FanoutExchange backupExchange){
return BindingBuilder.bind(backupQueue).to(backupExchange);
}

//报警交换机邦定到报警

@Bean
public Binding queueBindingWarningExchange(@Qualifier("warningQueue") Queue warningQueue,
@Qualifier("backupExchange") FanoutExchange backupExchange){
return BindingBuilder.bind(warningQueue).to(backupExchange);
}

}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/*
报警消费者
*/
@Slf4j
@Component
public class WarningConsumer {
//接受报警信息
@RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)
public void receiveWarningMsg(Message message){
String msg = new String(message.getBody());
log.info("报警发现不可路由消息: {}" ,msg);

}
}

幂等性

用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等

消息被重复消费

消费者在消费MQ中的消息时,MQ已把消息发送给消费者,消费者在给MQ返回ack时网络中断,故MQ未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。

解决思路

MQ消费者的幂等性的解决一般使用全局ID或者写个唯一标识比如时间戳或者UUID或者订单消费者消费MQ中的消息也可利用MQ的该id来判断,或者可按自己的规则生成一个全局唯一id,每次消费消息时用该id先判断该消息是否已消费过。

消费端的幂等性保障
在海量订单生成的业务高峰期,生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性,这就意味着我们的消息永远不会被消费多次,即使我们收到了一样的消息。业界主流的幂等性有两种操作:a.唯一ID+指纹码机制,利用数据库主键去重,b.利用redis的原子性去实现

唯一lD+指纹码机制
指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个id是否存在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。

Redis原子性(重点选择)
利用redis执行setnx命令,天然具有幕等性。从而实现不重复消费。

优先级队列

在我们系统中有一个订单催付的场景,我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧,但是,tmall商家对我们来说,肯定是要分大客户和小客户的对吧,比如像苹果,小米这样大商家一年起码能给我们创造很大的利润,所以理应当然,他们的订单必须得到优先处理,而曾经我们的后端系统是使用 redis来存放的定时轮询,大家都知道 redis 只能用List做一个简简单单的消息队列,并不能实现一个优先级的场景,所以订单量大了后采用RabbitMQ进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级,否则就是默认优先级。

如何添加

a.控制台添加

b.队列中代码添加优先级

1
2
3
Map<String,Object> parame = new HashMap<>();
parame.put("x-max-prority",10);
channel.queueDeclare("hello",true,false,false,params);

c.消息中代码添加优先级

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class Product {
//队列名称

public static final String QUEUE_NAME="hello";

//发消息
public static void main(String[] args) throws IOException, TimeoutException {
//建立一个连接方式
ConnectionFactory factory = new ConnectionFactory();
//工厂IP 连接RabbitMQ的队列
factory.setHost("www.littlehei.fun");
//用户名
factory.setUsername("guest");

//密码
factory.setPassword("guest");

//创建链接
Connection connection = factory.newConnection();

//获取信道
Channel channel = connection.createChannel();

//生成一个队列
/*
生成队列
参数1,队列名称
参数2,队列里边的消息是否持久化(磁盘)默认情况消息存储在内存中
参数3,该队列是否只供一个消费者进行消费,是否进行消息共享,true可以多个消费者消费,false:只能一个消费者消费
参数4,是否自动删除,最后一个消费者断开链接以后,该队是否自动删除,true,自动删除,false不删除
参数5,其他参数
*/
Map<String,Object> map = new HashMap<>();
map.put("x-max-priority",10);//官方允许是0-255之间,此处设置10,允许优化级范围为0-10 不要设置过大,浪费CPU和内存

channel.queueDeclare(QUEUE_NAME,true,false,false,map);

//发消息

for (int i = 0; i < 10; i++) {
String message = "info" + i;//初次使用

if(i == 5){
AMQP.BasicProperties properties =
new AMQP.BasicProperties().builder().priority(5).build();
channel.basicPublish("",QUEUE_NAME,properties,message.getBytes());
}else {
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());

}
}
/*
发送一个消费
参数1,发送到哪个交换机
参数2.路由的key值是哪个? 本次是队列的名称
参数3,其他参数配置
参数4,发送消息的消息体
*/
System.out.println("消息发送成功");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class Consumer {
//队列的名称:
private static final String QUEUE_NAME="hello";

//接收消息
public static void main(String[] args) throws IOException, TimeoutException {
//创建链接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("www.littlehei.fun");
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

//声明
DeliverCallback deliverCallback = (consumerTag, message)->{
// String message = new String("自己手动去创建一个消息,但是不推荐");
System.out.println(new String(message.getBody()));
};

//取消消息时的回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息消费被中断");
};

/*
消费者接收消息
参数1,消费哪个队列
参数2,消费成功之后是否要自动应答,true表示自动应答,false表示不是
参数3,消费者未成功消费的回调
参数4,消费者取录消费的回到
*/

channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}

d.注意事项

要让队列实现优先级需要做的事情有如下事情:

队列需要设置优先级队列,消息需要设置消息的优先级,消费者需要等待消息已经发送到队列中才去消费,因为这样才有机会对消息进行排序

惰性队列

RabbitMQ从3.6.0版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、若机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。
默认情况下,当生产者将消息发送到RabbitMQ的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当RabbitMQ需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。虽然RabbitMQ的开发者们一直在升级相关的算法,但是效果始终不太理想,尤其是在消息量特别大的时候。+

1667797292928

当消息大量过来,我们短时间处理不过来,这个时候我们采用惰性队列了

队列具备两种模式:default 和lazy。默认的为default模式,在3.6.0之前的版本无需做任何变更。lazy模式即为惰性队列的模式,可以通过调用channel.queueDeclare方法的时候在参数中设置,也可以通过Policy的方式设置,如果一个队列同时使用这两种方式设置的话,那么Policy的方式具备更高的优先级。
如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的。
在队列声明的时候可以通过“x-queue-mode”参数来设置队列的模式,取值为“default’和lazy”。下面示例中演示了一个惰性队列的声明细节:

Map args =new HashMap();

args.put(”x-queue-mode”,”lazy”);

channel.queueDeclare(”myqueue”,false,false,false,args);

在发送1百万条消息,每条消息大概占1KB的情况下,普通队列占用内存是1.2GB,而惰性队列仅仅占用1.5MB

集群部分:

clustering

搭建集群

如果是本地虚拟机的话,我们就将几台虚拟机进行克隆,造成集群环境。

然后选择一台服务器作为集群,其他两台服务器加入进去。

  1. 修改3台机器的主机名称

    vim /etc/hostname

    vim /etc/host进入后更该就行

    ​ ip地址 node1

    ​ ip地址 node2

    ​ ip地址 node3

  1. 配置各个节点的host文件,让各个节点都能相互识别对方。

  2. 确保各个节点的cookie文件使用的是同一个值。

    scp/var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmg/.erlang.cookie

    scp/var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmg/.erlang.cookie

  1. 启动RabbitMQ服务,顺带启动Erlang虚拟机和rabbitMQ应用服务(在三台节点上分布执行命令)

    rabbitmq-server-detached

  2. 在节点2执行

    rabbitmqctl stop_app

    (rabbitmqctl stop会将Erlang虚拟机关闭,rabbitmqct stop_app 只关闭RabbitMQ服务)

    rabbitmqctl reset

    rabbitmqctl join_cluster rabbit@node1

    rabbitmqctl start_app(只启动应用服务)

  3. 在节点3执行

    rabbitmqctl stop_app

    rabbitmqctl reset

    rabbitmqctl join_cluster rabbit@node2

    rabbitmqctl start_app(只启动应用服务)

  4. 集群状态

    rabbitmqctl cluster_status

  5. 需要重新设置用户角色

    创建账号

    ​ rabbitmqctl add_user admin 123

    设置用户角色

    ​ rabbitmqctl set_user_tags admin adminstractor

    设置用户权限

    rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

    此时如图,表示存在集群;

  6. 解除集群节点(node2,和node3分别执行)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    rabbitmqctl stop_app

    rabbitmqctl reset

    rabbitmqctl start_app

    rabbitmqctl cluster_status

    rabbitmqetl forget_cluster_node rabbit@node2(node1机器上执行)

镜像队列

如果RabbitMQ集群中只有一个Broker 节点,那么该节点的失效将导致整体服务的临时性不可用,并且也可能会导致消息的丢失。可以将所有消息都设置为持久化,并且对应队列的durable属性也设置为true,但是这样仍然无法避免由于缓存导致的问题:因为消息在发送之后和被写入磁盘井执行刷盘动作之间存在一个短暂却会产生问题的时间窗,通过publisherconfirm机制能够确保客户端知道哪些消息己经存入磁盘,尽管如此,一般不希望遇到因单点故障导致的服务不可用。
引入镜像队列(Miror Queue)的机制,可以将队列镜像到集群中的其他Broker节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。

搭建步骤

  1. 启动三台集群节点

  2. 随便找一个节点添加policy

单点无法连接

采用负载均衡,Haproxy + keepalive 实现高可用负载均衡。

FederationExchange(联邦交换机)

(broker北京),(broker 深圳)彼此之间相距甚远,网络延迟是一个不得不面对的问题。有一个在北京的业务(Client北京)需要连接(broker北京),向其中的交换器exchangeA发送消息,此时的网络延迟很小,(Client北京)可以迅速将消息发送至exchangeA中,就算在开启了publisherconfirm机制或者事务机制的情况下,也可以迅速收到确认信息。此时又有个在深圳的业务(Client 深圳)需要向 exchangeA发送消息,那么(Client深圳)(broker北京)之间有很大的网络延迟,(Client 深圳)将发送消息至exchangeA会经历一定的延迟,尤其是在开启了publisherconfirm机制或者事务机制的情况下,(Client深圳)会等待很长的延迟时间来接收(broker北京)的确认信息,进而必然造成这条发送线程的性能降低,甚至造成一定程度上的阻塞。

将业务(Client深圳)部署到北京的机房可以解决这个问题,但是如果(Client深圳)调用的另些服务都部署在深圳,那么又会引发新的时延问题,总不见得将所有业务全部部署在一个机房,那么容灾又何以实现?
这里使用Federation 插件就可以很好地解决这个问题.

由于两个地区的服务器比较远,这个时候就存在一种服务延迟情况,我们的federation交换机就是解决这个问题的。

搭建步骤:

  1. 需要保证每台节点单独运行

  2. 在每台机器上开启federation相关插件

    rabbitmq-plugins enable rabbitmq_federation

    rabbitmq-plugins enable rabbitmq federation_management

    每台服务器上都装一下。

    两个指令都需要安装。

  3. 原理图:(高山流水,只能从上游流向下游)。

    注意的是,必须从创建好下游节点的交换机,才能够将消息从上游传到下游。上游的消息在下游不能接收,那么这个消息就无法传递到下游。

  1. 在downstean(node2)配置upstream(node1);
  1. 添加规则

联邦队列

Shovel

使用它的原因
Federation 具备的数据转发功能类似,Shovel够可靠、持续地从一个Broker中的队列(作为源端,即source)拉取数据并转发至另一个Broker中的交换器(作为目的端,即destination)。作为源端的队列和作为目的端的交换器可以同时位于同一个Broker,也可以位于不同的Broker上。Shovel可以翻译为“铲子”
是一种比较形象的比喻,这个”铲子”可以将消息从一方“铲子”另一方。Shovel行为就像优秀的客户端应用程序能够负责连接源和目的地、负责消息的读写及负责连接失败问题的处理。
搭建步骤
1.开启插件(需要的机器都开启)

rabbitmq-plugins enable rabbitmq_shovel

rabbitmq-plugins enable rabbitmq_shovel_management