RabbitMQ学习笔记 分布式架构:一个请求由多个服务(服务或者系统)协同处理完成。
RabbitMQ:消息队列,或者说是一个消息中间键。
MQ功能:
削峰 :不让系统直接访问系统,而是通过MQ,然后交给MQ去访问系统,使得访问的人员进行排队。(有排队就会导致比较慢,但是安全有次序。)
应用解耦 :通过队列去访问系统,各个系统之间通过MQ去完成,也就会不会产生过多故障。
异步处理 :A-B,并不要A收到B的确定消息才会执行操作。
MQ的分类:
RabbitMQ: 和spring是一家公司的,使用AMQP(高级消息队列协议)基础上完成的,支持多种语言,缺点是商业版要付费。
kafka:为大数据而生,有百万级TPS的,实时计算,吞吐量高,使用在日志采集等方面。缺点是,单机超过64个队列/分区,load会发生明显的飙高现象,队列越多,load越高,发送响应时间越长。消费失败不支持重试,支持消息顺序,但一代代理宕机后,会产生消息乱序,社区更新慢。
RocketMQ:由阿里开发,金融行业需要,使用java语言实现,参考了kafka,一般常用于订单,交易,充值,流计算,消息推送,日志处理,bingblog分发等场景。
优点:单机吞吐10w级别,信息可以做到0丢失,MQ功能比较完善,还是分布式,拓展性号,支持10亿级别的消息堆积,不会因为堆积而导致性能下降。
缺点:这次客户端语言不多,目前是java和C++,其中C++还不成熟,社区活跃一般。
初级部分: RabbitMQ:是一个消息中间件,可以当做一个快递站,快递员发包裹到快递站,然后本地快递员将快递发给用户。
生产者:生产者 - > mq (一个交换机可以绑定多个队列) -> 消费者,一般情况下是一个队列对应一个消费者,如果两个一个队列对应两个消费者,那么就只有一个消费者收到消息,这就想当与只有一个包裹是不可能发给两个人的。
核心部分:
简单模式
工作模式
发布订阅模式
路由模式
主题模式
发布确认模式
connection: publisher/consumer 和 broker之间的TCP连接。
Channel: 如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销是巨大的,效率也比较低。channel作为轻量级的Connection极大减少了操作系统建立TCP connection的开销。
安装rabbitMQ: 官网:https://www.rabbitmq.com
下载页面:https://www.rabbitmq.com/download.html
我一般使用docker安装
1 2 3 4 docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.10-management
其他安装方式:
首先现在linux下的压缩包,或者使用工具将安装包上传到linux系统。
安装rabbitMQ的支持依赖
rpm -ivh erlang-21.3-1.el7.x86_64.rpm
yml install socat -y
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
添加常用命令:
添加开机启动chkconfig rabbitmq-server on
启动服务
找到安装路径
sbin下的service 里边的rabbitmq-server start /sbin/service rabbitmq-server start
查看服务状态/sbin/service rabbitmq-server status
停止服务/sbin/service rabbitmq-server stop
开启后台web管理插件rabbitmq-plugins enable rabbitmq_management
(docker安装的不要这么操作)
使用账号访问,一般是账号密码都是guest,当然你必须保证你的端口号开放,如果云服务器的话,15972,注意ip地址,如果有域名的话,那么可以在域名后添加:15672
直接访问.
如果不能直接访问的话,检查一下防火墙,systemctl stop firewalld
,这个命令关闭本次防火墙,systemctl enable firewalld
可以下次开机也关闭防火墙,systemctl status firewalld
可以查看当前系统防护墙状态
添加一个新用户,如果是使用虚拟机的话,一般是不能直接使用guest登录的,当然这个我没碰到过
创建一个账号rabbitmqctl add_user 用户名 密码
设置用户角色 rabbitmqctl set_user_tags admin administractor
设置用户权限
set_permission[-p <vhostpath>] <user> <conf> <write> <read>
举例:
rabbitmqctl set_permission -p "/" admin ".*" ".*" ".*"
用户user_admin具有/vhost1这个vritual host中的所有资源的配置,写,读权限。
当前用户和角色
rabbitmqctl list_user
如何使用MQ(java代码实现) IDEA新建maven工程导入依赖:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 <build > <plugins > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-compiler-plugin</artifactId > <version > 3.8.1</version > <configuration > <source > 8</source > <target > 8</target > </configuration > </plugin > </plugins > </build > <dependencies > <dependency > <groupId > com.rabbitmq</groupId > <artifactId > amqp-client</artifactId > <version > 5.16.0</version > </dependency > <dependency > <groupId > commons-io</groupId > <artifactId > commons-io</artifactId > <version > 2.7</version > </dependency > </dependencies >
生产者和消费者: 生产者 - 消息队列 - 消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.nio.charset.StandardCharsets;import java.util.concurrent.TimeoutException;public class Product { public static final String QUEUE_NAME="hello" ; public static void main (String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory (); factory.setHost("www.littlehei.fun" ); factory.setUsername("guest" ); factory.setPassword("guest" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false ,false ,false ,null ); String message = "hello world" ; channel.basicPublish("" ,QUEUE_NAME,null ,message.getBytes()); System.out.println("消息发送成功" ); } }
消费者:用来接受生成者产生的消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Consumer { private static final String QUEUE_NAME="hello" ; public static void main (String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory (); factory.setHost("www.littlehei.fun" ); factory.setUsername("guest" ); factory.setPassword("guest" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); DeliverCallback deliverCallback = (consumerTag,message)->{ System.out.println(new String (message.getBody())); }; CancelCallback cancelCallback = consumerTag -> { System.out.println("消息消费被中断" ); }; channel.basicConsume(QUEUE_NAME,true ,deliverCallback,cancelCallback); } }
工作队列 生成者 —-大量发消息— 队列 — 接到消息—工作线程1或者工作线程2.。。。
==注意:一个消息只能被处理一次不能处理多次==
轮训处理消息,你一个,我下一个,他下下个。
不同工作线程 之间的关系是==竞争==关系
创建链接工具类:
1 2 3 4 5 6 7 8 9 10 11 12 13 public class GetConnection { public static Channel getChannel () throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory (); connectionFactory.setHost("www.littlehei.fun" ); connectionFactory.setUsername("guest" ); connectionFactory.setPassword("guest" ); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); return channel; } }
轮训分发代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class WorkThread1 { public static final String QUEUE_NAME = "hello" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = GetConnection.getChannel(); DeliverCallback deliverCallback = (consumerTag,message)->{ System.out.println("接收到的消息为" + new String (message.getBody())); }; CancelCallback cancelCallback = (consumerTag)->{ System.out.println("消息被取消消费接口回调" ); }; System.out.println("C1等待接收消息..." ); channel.basicConsume(QUEUE_NAME,true ,deliverCallback,cancelCallback); } }
生产者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class Product1 { public static final String QUEUE_NAEM="hello" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = GetConnection.getChannel(); channel.queueDeclare(QUEUE_NAEM,false ,false ,false ,null ); Scanner sc = new Scanner (System.in); while (sc.hasNext()){ String name = sc.next(); channel.basicPublish("" ,QUEUE_NAEM,null ,name.getBytes()); System.out.println("发送完成: " +name); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 ----------结果---------- nihap 发送完成nihap wp1 发送完成wp1 ci1 发送完成ci1 wooda 发送完成wooda 我喜欢你 发送完成我喜欢你 ------------------ C1等待接收消息... 接收到的消息为nihap 接收到的消息为ci1 接收到的消息为我喜欢你 -------------------- C2等待接收消息... 接收到的消息为wp1 接收到的消息为wooda
消息应答 为了防止消息在发送过程中不丢失,他是==指消费者收到消息并且处理该消息之后,告诉rabbitmq他已经处理完成了,rabbitmq可以把消息删除。==
自动应答: 需要在高吞吐量和数据传输安全性方面做权衡 ,这种模式如果消息在接受到之前,消费者那边出现链接或者channel关闭,那么消息就丢失了,当然如果另一方面这种模式消费者那边可以传递过载消息,没有对传递的消息数量进行限制,这样有可能导致消费者由于接收太多还来不及处理的消息,导致这些消息积压,最终导致内存耗尽,最终这些消费者线程被操作系统杀死,故而这种模式适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。 (不靠谱)
手动应答 能用手动则用手动。手动方法
Channel.basicAck(用于肯定确认)
Channel.basicNack(用于否定确认)
Channel.basicReject(用于否定确认),对比nack少了一个参数,不处理该消息了直接就可以拒绝,也可以将他丢弃。
批量应答(multiple):==推荐使用false==,当接受到一个消息的时候,也就是队列往信道里边放了n条数据,那么他会将此信道里边的所有消息都应答一次。
消息自动重新入队 如果消费者由于某些原因失去连接,或者说他的通道已经关闭,连接已经关闭或者TCP连接丢失,导致消息未发送ACK确认,此时RabbitMQ将了解到消息未完全处理,并将其重新排队,如果此时其他消费者可以处理,他将很快将其重新分发给另一个消费者, 这样即使某个消费者偶尔死亡,也可以确保不会丢失任何信息。
个人理解:
也就是本来做这个任务的员工有事辞职了,公司为了完成任务交给另一个还在职的员工完成这个任务,直到这个任务完成,这样保证任务不被丢失。
代码实现:
消息在手动应答的时候是不应该被丢失的,且会重新入队。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class Task { public static final String QUEUE_NAME = "ack_queue" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = GetConnection.getChannel(); channel.queueDeclare(QUEUE_NAME,false ,false ,false ,null ); Scanner sc = new Scanner (System.in); while (sc.hasNext()){ String message = sc.next(); channel.basicPublish("" ,QUEUE_NAME,null ,message.getBytes("UTF-8" )); System.out.println("生产者发送消息: " + message); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public class Work_Consume1 { public static final String QUEUE_NAME = "ack_queue" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = GetConnection.getChannel(); System.out.println("C1等待接受消息处理时间较短" ); DeliverCallback deliverCallback = (consumerTag, message)->{ SleepUtils.sleep(1 ); System.out.println("接收到消息" + new String (message.getBody(),"UTF-8" )); channel.basicAck(message.getEnvelope().getDeliveryTag(),false ); }; boolean autoAck = false ; channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,(consumerTag->{ System.out.println(consumerTag + "消费者取消消费接口回调逻辑" ); })); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class SleepUtils { public static void sleep (int second) { try { Thread.sleep(1000 *second); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
通过上述方式,我们可以知道消息队列在我们创建多个消费者的时候,是轮转来的,当然,假设有两个消费者,如果有一个宕机,那么消息队列就将本轮转给没宕机的那个消费者消费,==只要生产者有产生,无论如何要将消息全部消费==,不可丢弃。
Rabbit消息持久化 消息是可以持久化保存的,持久的目的是为了处理任务丢失情况的,采用持久化可以保证消息存储,且消息不被丢失。
队列如何持久化
两个持久化操作都是在生产者中进行的。
我们需要将durable参数设置为持久化
1 2 3 boolean durable = true ;channel.queueDeclare(队列名,durable,false ,false ,null )
但是需要注意的是,==就是如果之前声明的队列不是持久化的,需要把原先队列先删除,然后重新创建一个持久化队列,不然就会出现错误==。
错误:
com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg ‘durable’ for queue ‘ack_queue’ in vhost ‘/‘: received ‘true’ but current is ‘false’, class-id=50, method-id=10)
注意:
==持久化后的rabbitMQ重启之后队列消息还是会存在的,未持久化的,那么对不起,他没了。==
消息持久化 将消息标记为持久化并不能保证不会丢失消息,尽管他会告诉rabbitMQ将消息保存到磁盘中,但是这里仍然存在当消息刚刚存储池在磁盘的时候,但是还没有存储完,消息还在缓存的一个间隔点,此时并没有真正写入磁盘,持久性保证并不强,但是对于我们简单任务队列而言,这已经就绰绰有余了。
MessageProperties.PERSISTENT_TEXT_PLAIN
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class Task { public static final String QUEUE_NAME = "ack_queue" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = GetConnection.getChannel(); boolean durable = true ; channel.queueDeclare(QUEUE_NAME,durable,false ,false ,null ); Scanner sc = new Scanner (System.in); while (sc.hasNext()){ String message = sc.next(); channel.basicPublish("" ,QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8" )); System.out.println("生产者发送消息: " + message); } } }
不公平分发: 在某些情况下轮训分发并不好用,具体例子:有两个消费者在处理任务,其中有个消费者1处理任务的速度非常快,而另一个消费者2处理的速度却很慢,这个时候我们还采用轮训分发就会导致这个处理速度快的很大一部分时间处于空闲状态,而处理慢的那个消费者就一直在干活,这种分配方式就不是很好。
==能者多劳==,多劳多得。
为了避免这种情况,我们有采用不公平分发操作
设置参数channel.basicQos(1);
能者多劳是在消费者中设置的
1 2 3 int prefetchCount = 1 ;channel.basicQos(prefetchCount);
一般我们生活中采用的就是不公平分发。
预取值:
预先分配任务,比如生产者生成7条数据,通过队列分发,通过==预取值,预取值是多少,就给属于那条信道的消费者分配多少消息。和消费者处理消息的快慢无关。==
注意 :
预取值消耗完毕后,之后的值就按照那个消费者快给他分配的信息就多,谁慢,谁分配的信息就少。
预取值是洗发人员能限制缓冲区大小,避免缓冲区里边无限制的未确定消息问题。
需要注意的是,预取值并不是你直接输入多少条数据他就可以直接堆满的我们设置的预取值的,他可能由于消费费者处理速度影响,比如,你输入10条数据,但是设置预取值为5的最后值只会产生4条数据也说不准,因为另一个太快了,其他的数据都被他消耗完毕了。
1 2 3 4 5 int i= 5 ; channel.basicQos(i);
1 2 3 4 int i = 2 ; channel.basicQos(i);
发布和确认 生产者 - - 发送消息 — 队列hello :apple:必须保存在磁盘上才能达到持久化操作。
设置要去队列必须持久化
设置要求队列中的消息必须持久化
发布确认
开启发布确认 单个确认发布: 每次生产者生产一个消息他都会确认一次,这样的好处就是,如果发送信息丢失可以很容易确定位置,缺点是慢
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public static void publicMessageIndividually () throws IOException, TimeoutException, InterruptedException { Channel channel = GetConnection.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName,true ,false ,false ,null ); channel.confirmSelect(); long begin = System.currentTimeMillis(); for (int i = 0 ; i < MESSAGE_COUNT; i++) { String message = i + "" ; channel.basicPublish("" ,queueName,null ,message.getBytes()); boolean flag = channel.waitForConfirms(); if (flag){ System.out.println("消息发送成功" ); } } long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "单个消息" + (end-begin)+"毫秒" ); }
批量确认发布: 批量发布对比单个发布速度快了很多,但是这个碰到问题去查找问题的时候就比较慢了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public static void publicMessageBatch () throws IOException, TimeoutException, InterruptedException { Channel channel = GetConnection.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName,true ,false ,false ,null ); channel.confirmSelect(); long begin = System.currentTimeMillis(); int batchSize = 10 ; for (int i = 0 ; i < MESSAGE_COUNT; i++) { String message = i + "" ; channel.basicPublish("" ,queueName,null ,message.getBytes()); boolean flag = channel.waitForConfirms(); if (i%batchSize == 0 ){ channel.waitForConfirms(); } } long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "发布100个消息" + (end-begin) +"毫秒" ); }
异步确认发布 对比单步和批量这个就厉害多了,他可以异常进行,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功。
寄快件的人疯狂发,然后到broker中选择哪些需要确认的进行确认。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public static void publishMessageAsync () throws IOException, TimeoutException { Channel channel = GetConnection.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName,true ,false ,false ,null ); channel.confirmSelect(); long begin = System.currentTimeMillis(); ConfirmCallback ackCallback = (delivery,multiple)->{ System.out.println("确认的消息:" + delivery); }; ConfirmCallback nackCallback = (delivery,multiple)->{ System.out.println("未确认的消息:" + delivery); }; channel.addConfirmListener(ackCallback,nackCallback); for (int i = 0 ; i < MESSAGE_COUNT; i++) { String message = "消息" + i; channel.basicPublish("" ,queueName,null ,message.getBytes()); } long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "异步发布确认消息,耗时" + (end-begin) +"毫秒" ); }
这就出现一个问题,如何处理异步未确认消息 最好的解决的解决方案,就是把未确认的消息放到一个基于内存的能发布线程访问的队列,比如说ConcurrentLinkedQueue这个队列在confirm callbacks与发布线程之间进行消息的传递。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 public static void publishMessageAsync () throws IOException, TimeoutException { Channel channel = GetConnection.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName,true ,false ,false ,null ); channel.confirmSelect(); ConcurrentSkipListMap<Long,String> outstandingConfirms= new ConcurrentSkipListMap <>(); ConfirmCallback ackCallback = (delivery,multiple)->{ if (multiple){ ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(delivery); confirmed.clear(); }else { outstandingConfirms.remove(delivery); } System.out.println("确认的消息:" + delivery); }; ConfirmCallback nackCallback = (delivery,multiple)->{ String message = outstandingConfirms.get(delivery); System.out.println("未确认的消息是:" + message+"未确认的标记:" + delivery); }; channel.addConfirmListener(ackCallback,nackCallback); long begin = System.currentTimeMillis(); for (int i = 0 ; i < MESSAGE_COUNT; i++) { String message = "消息" + i; channel.basicPublish("" ,queueName,null ,message.getBytes()); outstandingConfirms.put(channel.getNextPublishSeqNo(),message); System.out.println(); } long end = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "异步发布确认消息,耗时" + (end-begin) +"毫秒" ); }
以上三种发布的确认速度对比 单步发布消息
批量发布确认
批量同步等待确认,加单,合理的吞吐量,一旦出现问题,但很难推断出是那条消息出现了问题。
异步发布确认
最佳性能和资源使用,在出现错误的情况下可以很好的控制,但是实现起来稍微难一点。
交换机(exchanges) 当使用到交换机的时候,我们用的就不是普通的模式了,而是发布订阅模式了。
生产者生成的消息不会直接发送到队列
而是直接将消息先发送到交换机,并且只能发送到交换机,之钱的我们可以直接发送到队列(事实上我们走的是默认交换机),然后队列交给消费者,现在不行了,改成先发送给交换机,交换机在发给队列。
交换机的工作方式很简单,他接收来自生产者的消息,另一方面将他们推送到队列。
交换机的类型 直接(direct),主题(topic),标题(heads),扇出(fanout)
无名exchanges 事实上就是默认类型,我们通过空字符(””)串进行标识。
channel.basicPublic("","hello",null,message.getBytes());
实际上第一个参数就是交换机的名字,空字符串表示默认或者无名称交换机,消息能路由发送到队列中,其实是由routing(bindingkey)绑定key指定的,如果它存在的话。
临时队列 实际上是自定义的,一旦断开链接,这个队列就会被删除。
绑定 实际上就是交换机和队列的桥梁,他告诉我们交换机和那个队列进行绑定关系
Fanout fanout是一种交换机的类型,这种类型非常简单,正如名称中猜想的那样,他是将接收的所有消息广播 到他知道的所有队列中,系统中默认有exchanges交换机类型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class EmitLog { public static final String EXCHANGE_NAME="logs" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = GetConnection.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"fanout" ); Scanner sc = new Scanner (System.in); while (sc.hasNext()){ String message = sc.next(); channel.basicPublish(EXCHANGE_NAME,"" ,null ,message.getBytes("UTF-8" )); System.out.println("生产者发送消息:" + message); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class ReceiveLog { public static final String EXCHANG_NAME="logs" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = GetConnection.getChannel(); channel.exchangeDeclare(EXCHANG_NAME,"fanout" ); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName,EXCHANG_NAME,"" ); System.out.println("等待接受消息,把消息显示在屏幕上" ); DeliverCallback deliverCallback = (consumer,message)->{ System.out.println("Receive控制台打印接受到的消息" + new String (message.getBody(),"UTF-8" ) ); }; channel.basicConsume(queueName,true ,deliverCallback,consumer->{ }); } }
Direct 从图上我们可以看到,X绑定了两个队列,绑定类型是direct,队列Q1绑定建为orange,队列2绑定键有两个,一个为black,另一个为green.
在这种情况下,发布这发布消息到exchange上,绑定键为orange的消息会被发布到队列Q1绑定键为blackgreen的消息会被发布到队列Q2,其他消息类型就被丢弃。
多重绑定
当然如果exchange的绑定类型是direct,但是他绑定的多个队列的key如果多相同在这种情况下虽然绑定类型是direct,但是他表现的就和fanout有点想类似了,就和广播差不多了。
这边就可以指定发给谁了,我不发给谁他就收不到消息,只有我允许他接收他才能得到接收。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class DirectLogs { public static final String EXCHANGE_NAME="direct_logs" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = GetConnection.getChannel(); Scanner sc = new Scanner (System.in); while (sc.hasNext()){ String message = sc.next(); channel.basicPublish(EXCHANGE_NAME,"info" ,null ,message.getBytes("UTF-8" )); System.out.println("生产者发送消息:" + message); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class ReceiveLogDirect01 { public static final String EXCHANGE_NAME = "direct_logs" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = GetConnection.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare("console" ,false ,false ,false ,null ); channel.queueBind("console" ,EXCHANGE_NAME,"info" ); channel.queueBind("console" ,EXCHANGE_NAME,"warning" ); DeliverCallback deliverCallback = (consumer, message)->{ System.out.println("ReceiveLogsDirect01控制台打印接受到的消息" + new String (message.getBody(),"UTF-8" ) ); }; channel.basicConsume("console" ,true ,deliverCallback,consumer->{}); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class ReceiveLogDirect02 { public static final String EXCHANGE_NAME = "direct_logs" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = GetConnection.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare("disk" ,false ,false ,false ,null ); channel.queueBind("disk" ,EXCHANGE_NAME,"error" ); DeliverCallback deliverCallback = (consumer, message)->{ System.out.println("ReceiveLogsDirect01控制台打印接受到的消息" + new String (message.getBody(),"UTF-8" ) ); }; channel.basicConsume("disk" ,true ,deliverCallback,consumer->{}); } }
Topic 对比上面两种交换机更加完美,当存在我们要接受日志类型有info.base和info.advantage,某个队列只想info.base的消息,那这个·时候上边两种交换机就做不到了。此时我们采用的就是topic交换机类型了。
Topic
发送类型是topic交换机的消息的routing_key不能随意写,必须满足一定的要求,他必须是一个单纯列表,以点号隔开 ,这个单次可以是任意单词。”nysc.xxx”等。限制要求单词列表不能超过255个字节。
需要注意的是
* 代表的是一个单词
‘#’ 代表的是可以替换零个或多个单词。
下图绑定关系如下
Q1—> 绑定的是
中间代orange带3个单词的字符串( .orange. )
Q2—> 绑定的是
最后一个单词是rabbit的3个单词( . rabbit)
第一个单词是lazy的多个单词(lazy.#)
下边我们将上图之间的数据接受情况列举出来
quick.orange .rabbit 被队列Q1Q2接受到
lazy .orange .elephant 被队列Q1Q2接受到
quick.orange .fox 被队列Q1接受到
lazy .brown.fox 被队列Q2接受到
lazy .pink.rabbit 虽然满足两个绑定但只被队列Q2接受一次
quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃
quick.orange .male.rabbit 是四个单词不匹配任何绑定会被丢弃
lazy .orange.male.rabbit 是四个单词但匹配Q2
==上述加粗的是符合TOPIC类型交换机的单词指令==
==是最强大的,也是使用最广的==
当队列绑定关系是#,那么这个队列将接收所有的数据,有点像Fanout了。
如果队列绑定的键中没有#和*出现,那么该队列绑定类类型就是direct了。
消费者1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class ReceiveLogsTopic01 { public static final String EXCHANGE_NAME= "topic_logs" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = GetConnection.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"topic" ); String queueName = "Q1" ; channel.queueDeclare(queueName,false ,false ,false ,null ); channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*" ); System.out.println("等待接受消息" ); DeliverCallback deliverCallback = (consumer,message)->{ System.out.println(new String (message.getBody(),"UTF-8" )); System.out.println("接受队列:——> " +queueName +"绑定键:" + message.getEnvelope().getRoutingKey()); }; channel.basicConsume(queueName,true ,deliverCallback,consumer->{}); } }
等待接受消息 被队列Q1Q2接受到 接受队列:——> Q1绑定键:lazy.orange.elephant 被队列Q1Q2接受到 接受队列:——> Q1绑定键:quick.orange.rabbit 被队列Q1接受到 接受队列:——> Q1绑定键:quick.orange.fox
消费者2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class ReceiveLogsTopic02 { public static final String EXCHANGE_NAME= "topic_logs" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = GetConnection.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"topic" ); String queueName = "Q2" ; channel.queueDeclare(queueName,false ,false ,false ,null ); channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit" ); channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#" ); System.out.println("等待接受消息" ); DeliverCallback deliverCallback = (consumer,message)->{ System.out.println(new String (message.getBody(),"UTF-8" )); System.out.println("接受队列:——> " +queueName +"绑定键:" + message.getEnvelope().getRoutingKey()); }; channel.basicConsume(queueName,true ,deliverCallback,consumer->{}); } }
等待接受消息 被队列Q1Q2接受到 接受队列:——> Q2绑定键:lazy.orange.elephant 被队列Q2接受到 接受队列:——> Q2绑定键:lazy.brown.fox 被队列Q1Q2接受到 接受队列:——> Q2绑定键:quick.orange.rabbit 虽然满足两个绑定但只被队列Q2接受一次 接受队列:——> Q2绑定键:lazy.pink.rabbit 是四个单词但匹配Q2 接受队列:——> Q2绑定键:lazy.orange.male.rabbit
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public class EmitLogTopic { private static final String EXCHANGE_NAME="topic_logs" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = GetConnection.getChannel(); Map<String,String> map = new HashMap <>(); map.put("quick.orange.rabbit" ,"被队列Q1Q2接受到" ); map.put("lazy.orange.elephant" ,"被队列Q1Q2接受到" ); map.put("quick.orange.fox" ,"被队列Q1接受到" ); map.put("lazy.brown.fox" ,"被队列Q2接受到" ); map.put("lazy.pink.rabbit" ,"虽然满足两个绑定但只被队列Q2接受一次" ); map.put("quick.brown.fox" ,"不匹配任何绑定不会被任何队列接收到会被丢弃" ); map.put("quick.orange.male.rabbit" ,"是四个单词不匹配任何绑定会被丢弃" ); map.put("lazy.orange.male.rabbit" ,"是四个单词但匹配Q2" ); for (Map.Entry<String, String> mapEntry : map.entrySet()) { String routingKey = mapEntry.getKey(); String message = mapEntry.getValue(); channel.basicPublish(EXCHANGE_NAME,routingKey,null ,message.getBytes("UTF-8" )); System.out.println("生产者发送消息: " + message); } } }
生产者发送消息: 是四个单词不匹配任何绑定会被丢弃
生产者发送消息: 不匹配任何绑定不会被任何队列接收到会被丢弃
生产者发送消息: 被队列Q1Q2接受到
生产者发送消息: 被队列Q2接受到
生产者发送消息: 被队列Q1Q2接受到
生产者发送消息: 被队列Q1接受到
生产者发送消息: 虽然满足两个绑定但只被队列Q2接受一次
生产者发送消息: 是四个单词但匹配Q2
死信队列 死信:顾名思义就是无法被消费的消息 ,一般情况下,product将消息投递到broker或者直接到queue里,consumer从queue取出消息,进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费 ,这样的消息如果没有后续的处理,就变成死信,有死信自然就有死信队列。
一般应用场景:为了保证订单业务中的消息数据不丢失,需要使用rabbitMQ的死信队列机制,当消息消费发生异常的时候,将消息投入死信队列中 ,还有比如说:用户在商场下单成功并点击去支付后在指定时间未支付时自动失效。
死信的来源
消息TTL过期
队列达到最大长度(队列满了,无法再添加数据到mq中)
消息被拒绝(basic.reject或basic.nack)并且requeue= false;
死信实战 消息TTL过期 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class Product { public static final String NORMAL_EXCHANGE = "normal_exchange" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = GetConnection.getChannel(); AMQP.BasicProperties properties = new AMQP .BasicProperties() .builder().expiration("10000" ).build(); for (int i = 0 ; i < 11 ; i++) { String message = "info " + i; channel.basicPublish(NORMAL_EXCHANGE,"zhangsan" ,properties,message.getBytes()); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public class Consumer01 { private static final String NORMAL_EXCHANGE = "normal_exchange" ; private static final String DEAD_EXCHANGE = "dead_exchange" ; private static final String NORMAL_QUEUE = "normal_queue" ; private static final String DEAD_QUEUE = "dead_queue" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = GetConnection.getChannel(); channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT); Map<String,Object> argument = new HashMap <>(); argument.put("x-dead-lettle-exchange" ,DEAD_EXCHANGE); argument.put("x-dead-lettle-routing-key" ,"lisi" ); channel.queueDeclare(NORMAL_QUEUE,false ,false ,false ,argument); channel.queueDeclare(DEAD_QUEUE,false ,false ,false ,null ); channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan" ); channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi" ); System.out.println("等待接受......" ); DeliverCallback deliverCallback = (consumer,message)->{ System.out.println("Consumer 01接受到消息是:" + new String (message.getBody(),"UTF-8" )); }; channel.basicConsume(NORMAL_QUEUE,true ,deliverCallback,consumer->{}); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class Consumer02 { private static final String DEAD_QUEUE = "dead_queue" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = GetConnection.getChannel(); System.out.println("等待接受......" ); DeliverCallback deliverCallback = (consumer,message)->{ System.out.println("Consumer 02接受到消息是:" + new String (message.getBody(),"UTF-8" )); }; channel.basicConsume(DEAD_QUEUE,true ,deliverCallback,consumer->{}); } }
==解决进不去死信队列的原因了,就是我没给他把过期时间值传进去,导致出现问题。==
队列达到最大长度 消息生成者代码去掉TTL属性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public class Consumer01 { private static final String NORMAL_EXCHANGE = "normal_exchange" ; private static final String DEAD_EXCHANGE = "dead_exchange" ; private static final String NORMAL_QUEUE = "normal_queue" ; private static final String DEAD_QUEUE = "dead_queue" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = GetConnection.getChannel(); channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT); Map<String,Object> argument = new HashMap <>(); argument.put("x-dead-letter-exchange" ,DEAD_EXCHANGE); argument.put("x-dead-letter-routing-key" ,"lisi" ); argument.put("x-max-length" ,6 ); channel.queueDeclare(NORMAL_QUEUE,false ,false ,false ,argument); channel.queueDeclare(DEAD_QUEUE,false ,false ,false ,null ); channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan" ); channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi" ); System.out.println("等待接受......" ); DeliverCallback deliverCallback = (consumer,message)->{ System.out.println("Consumer 01接受到消息是:" + new String (message.getBody(),"UTF-8" )); }; channel.basicConsume(NORMAL_QUEUE,true ,deliverCallback,consumer->{}); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class Product { public static final String NORMAL_EXCHANGE = "normal_exchange" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = GetConnection.getChannel(); for (int i = 0 ; i < 11 ; i++) { String message = "info " + i; channel.basicPublish(NORMAL_EXCHANGE,"zhangsan" ,properties,message.getBytes()); System.out.println("生产者发送消息:" +message); } } }
消息被拒绝 一旦消费者拒绝接受,就会成为死信队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class Product { public static final String NORMAL_EXCHANGE = "normal_exchange" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = GetConnection.getChannel(); for (int i = 0 ; i < 11 ; i++) { String message = "info " + i; channel.basicPublish(NORMAL_EXCHANGE,"zhangsan" ,null ,message.getBytes()); System.out.println("生产者发送消息:" +message); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 public class Consumer01 { private static final String NORMAL_EXCHANGE = "normal_exchange" ; private static final String DEAD_EXCHANGE = "dead_exchange" ; private static final String NORMAL_QUEUE = "normal_queue" ; private static final String DEAD_QUEUE = "dead_queue" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = GetConnection.getChannel(); channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT); Map<String,Object> argument = new HashMap <>(); argument.put("x-dead-letter-exchange" ,DEAD_EXCHANGE); argument.put("x-dead-letter-routing-key" ,"lisi" ); channel.queueDeclare(NORMAL_QUEUE,false ,false ,false ,argument); channel.queueDeclare(DEAD_QUEUE,false ,false ,false ,null ); channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan" ); channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi" ); System.out.println("等待接受......" ); DeliverCallback deliverCallback = (consumer,message)->{ String message1 = new String (message.getBody(),"UTF-8" ); if (message1.equals("info 5" )){ System.out.println("此消息是被拒绝的" + message1); channel.basicReject(message.getEnvelope().getDeliveryTag(),false ); }else { System.out.println("Consumer 01接受到消息是:" + new String (message.getBody(),"UTF-8" )); channel.basicAck(message.getEnvelope().getDeliveryTag(),false ); } }; channel.basicConsume(NORMAL_QUEUE,false ,deliverCallback,consumer->{}); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public class Consumer02 { private static final String DEAD_EXCHANGE = "dead_exchange" ; private static final String DEAD_QUEUE = "dead_queue" ; public static void main (String[] args) throws IOException, TimeoutException { Channel channel = GetConnection.getChannel(); channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi" ); System.out.println("等待接受......" ); DeliverCallback deliverCallback = (consumer,message)->{ System.out.println("Consumer 02接受到消息是:" + new String (message.getBody(),"UTF-8" )); }; channel.basicConsume(DEAD_QUEUE,true ,deliverCallback,consumer->{}); } }
延迟队列 延迟队列,对列内部是有序的,最重要的特性就是体现在他的延时属性上,延时队列中的元素时希望在指定时间到了之后或者之前取出和处理,简单的来说,延迟队列就是用来存放需要在指定时间被处理的元素的队列。
延迟队列的使用场景
订单在十分钟内未支付则自动取消
新创建的店铺,如果在十天内没有上传商品,则自动发送消息提醒
用户注册成功后,如果没有在三天内登录则发送短信提醒消息
用户发起退款,如果在三天内没有得到处理则通知相关运营人员。
预定会员后,需要在预定的时间点前十分钟通知各个与会员人员参加会议。
延迟队列整合SpringBoot 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 <?xml version="1.0" encoding="UTF-8" ?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <groupId > com.xh</groupId > <artifactId > SpringBoot_RabbitMQ</artifactId > <version > 0.0.1-SNAPSHOT</version > <name > SpringBoot_RabbitMQ</name > <description > SpringBoot_RabbitMQ</description > <properties > <java.version > 1.8</java.version > <project.build.sourceEncoding > UTF-8</project.build.sourceEncoding > <project.reporting.outputEncoding > UTF-8</project.reporting.outputEncoding > <spring-boot.version > 2.3.7.RELEASE</spring-boot.version > </properties > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > <version > 2.1.8.RELEASE</version > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > <scope > test</scope > </dependency > <dependency > <groupId > com.alibaba</groupId > <artifactId > fastjson</artifactId > <version > 1.2.83</version > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > <version > 1.18.24</version > </dependency > <dependency > <groupId > io.springfox</groupId > <artifactId > springfox-swagger2</artifactId > <version > 2.9.2</version > </dependency > <dependency > <groupId > io.springfox</groupId > <artifactId > springfox-swagger-ui</artifactId > <version > 2.9.2</version > </dependency > <dependency > <groupId > org.springframework.amqp</groupId > <artifactId > spring-rabbit-test</artifactId > <scope > test</scope > </dependency > </dependencies > <dependencyManagement > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-dependencies</artifactId > <version > ${spring-boot.version}</version > <type > pom</type > <scope > import</scope > </dependency > </dependencies > </dependencyManagement > <build > <plugins > <plugin > <groupId > org.apache.maven.plugins</groupId > <artifactId > maven-compiler-plugin</artifactId > <version > 3.8.1</version > <configuration > <source > 1.8</source > <target > 1.8</target > <encoding > UTF-8</encoding > </configuration > </plugin > <plugin > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-maven-plugin</artifactId > <version > 2.3.7.RELEASE</version > <configuration > <mainClass > com.xh.springBoot.SpringBootRabbitMqApplication</mainClass > </configuration > <executions > <execution > <id > repackage</id > <goals > <goal > repackage</goal > </goals > </execution > </executions > </plugin > </plugins > </build > </project >
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Configuration @EnableSwagger2 public class SwaggerConfig { @Bean public Docket webApiConfig () { return new Docket (DocumentationType.SWAGGER_2) .groupName("webApi" ) .apiInfo(webApiInfo()) .select() .build(); } private ApiInfo webApiInfo () { return new ApiInfoBuilder () .title("rabbitMQ接口文档" ) .description("描述Rabbit微服务接口定义" ) .version("1.0" ) .contact(new Contact ("我喜欢" ,"127.0.0.1" ,"2844****670@qq.com" )) .build(); } }
队列TTL 创建两个队列QA和QB,两者队列TTL分被是10s,40s,然后在创建一个交换机X和死信交换机Y,他们的类型都是direct,创建一个死信队列QD,他们的绑定关系如下
代码文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 @Configuration public class TtlQueueConfig { public static final String X_EXCHANGE = "X" ; public static final String Y_DEAD_LETTER_EXCHANGE = "Y" ; public static final String A_QUEUE = "QA" ; public static final String B_QUEUE = "QB" ; public static final String Y_DEAD_LETTER_QUEUE = "QD" ; @Bean("xExchange") public DirectExchange xExchange () { return new DirectExchange (X_EXCHANGE); } @Bean("yExchange") public DirectExchange yExchange () { return new DirectExchange (Y_DEAD_LETTER_EXCHANGE); } @Bean("queueA") public Queue queueA () { Map<String,Object> map =new HashMap <>(); map.put("x-dead-letter-exchange" ,Y_DEAD_LETTER_EXCHANGE); map.put("x-dead-letter-routing-key" ,"YD" ); map.put("x-message-ttl" ,10000 ); return QueueBuilder.durable(A_QUEUE).withArguments(map).build(); } @Bean("queueB") public Queue queueB () { Map<String,Object> map =new HashMap <>(); map.put("x-dead-letter-exchange" ,Y_DEAD_LETTER_EXCHANGE); map.put("x-dead-letter-routing-key" ,"YD" ); map.put("x-message-ttl" ,40000 ); return QueueBuilder.durable(B_QUEUE).withArguments(map).build(); } @Bean("queueD") public Queue QueueD () { return QueueBuilder.durable(Y_DEAD_LETTER_QUEUE).build(); } @Bean public Binding queueABindingX (@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueA).to(xExchange).with("XA" ); } @Bean public Binding queueBBindingX (@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueB).to(xExchange).with("XB" ); } @Bean public Binding queueDBindingX (@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange) { return BindingBuilder.bind(queueD).to(yExchange).with("YD" ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Slf4j @RestController @RequestMapping("/ttl") public class SellMessageController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMessage/{message}") public void sendMessage (@PathVariable String message) { log.info("当前时间:{},发送一条信息给两个TTL队列:{}" ,new Date ().toString(),message); rabbitTemplate.convertAndSend("X" ,"XA" ,"消息来自ttl为10秒的消息队列:" + message); rabbitTemplate.convertAndSend("X" ,"XB" ,"消息来自ttl为40秒的消息队列:" + message); } }
http://127.0.0.1:8080/ttl/sendMessage/hello1
http://127.0.0.1:8080/ttl/sendMessage/hello2
上述存在一个不足之处,就是我们每次使用的时候,都需要增加一个新的时间需求,就需要新增一个队列 ,这里只是用了10s和40s两个时间选项,但万一我要一个小时呢,难不成还又要添加,这样太麻烦了。
延迟队列优化 使用QC来设置时间,用户自定义时间。
具体代码如下
TtlQueueConfig.java中添加
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public static final String C_QUEUE = "QC" ; @Bean("queueC") public Queue queueC () { Map<String,Object> map = new HashMap <>(3 ); map.put("x-dead-letter-exchange" ,Y_DEAD_LETTER_EXCHANGE); map.put("x-dead-letter-routing-key" ,"YD" ); return QueueBuilder.durable(C_QUEUE).withArguments(map).build(); } @Bean public Binding queueCBindingX (@Qualifier("queueC") Queue queueC, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueC).to(xExchange).with("XC" ); }
SellMessageController.java
1 2 3 4 5 6 7 8 9 10 11 @GetMapping("/sendExpirationMsg/{message}/{ttlTime}") public void sendMsg (@PathVariable String message,@PathVariable String ttlTime) { log.info("当前时间:{},发送一条时长{}毫秒TTL信息给队列QC:{}" , new Date ().toString(),ttlTime,message); rabbitTemplate.convertAndSend("X" ,"XC" ,message,msg -> { msg.getMessageProperties().setExpiration(ttlTime); return msg; } ); }
==注意我在写这个代码的时候,刚刚开始是敲错了一个字母,导致运行时生成的队列,然后调错的时候,需要生成的队列删除,然后重新创建,可以解决问题。。==
>http://127.0.0.1:8080/ttl/sendExpirationMsg/hello1/2000
>
>http://127.0.0.1:8080/ttl/sendExpirationMsg/hello2/20000
死信在做延迟的一个巨大缺陷,消息不会按时“死亡”,由于RabbitMQ只会检查第一个消息是否过期 ,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延迟时长很短,则第二个消息并不会优先得到执行。
这里出现一个问题,当出现两条消息的时候,我们发现,他居然是按照谁先来的,哪怕你时间短,但是你比另一个队列慢一步,你就只能老老实实排队了。
RabbitMQ插件实现延迟队列 这个就是解决上边的问题的,我们使用插件解决。
在官网上下载: https://www/rabbitmq.com/community-plugins.html,下载
rabbitmq_delayed_message_exchange-3.8.0
拷贝到
/usr/lib/rabbitmq/lib/rabbitmq_server-版本号/plugs
安装指令
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
然后在linux系统下解压
然后安装rabbitmq-plugins enable rabbitmq_delayed_message_exchange
之后重启
systemctl restart rabbitmq-server(这个是你安装时候的名字)
这个时候不在是队列,而是交换机了。
没插件的时候
基于插件的
==注意,由于我是docker安装的,我没找到安装这个插件的位置,故而,这一点,我只能敲一下代码了解一下==
解决办法:
https://blog.csdn.net/DZP_dream/article/details/118391439
docker search rabbitmq
安装拉取容器:
docker run -dit --name rabbitmq -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest -p 15672:15672 -p 5672:5672 rabbitmq:management
https://www.rabbitmq.com/community-plugins.html
docker cp rabbitmq_delayed_message_exchange-3.8.0.ez rabbitmq:/plugins
1 2 [root@local rabbitmq] root@3bb56f68570b:/
启动插件
root@3bb56f68570b:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
DelayedConfig
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 @Configuration public class DelayedQueueConfig { public static final String DELAYED_QUEUE_NAME = "delayed.queue" ; public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange" ; public static final String DELAYED_ROUTING_KEY = "delayed.routing" ; @Bean public Queue delayedQueue () { return new Queue (DELAYED_QUEUE_NAME); } @Bean public CustomExchange delayedExchange () { Map<String,Object> map = new HashMap <>(); map.put("x-delayed-type" ,"direct" ); return new CustomExchange (DELAYED_EXCHANGE_NAME,"x-delayed-message" , true ,false ,map); } @Bean public Binding delayedQueueBingdingDelayedExchange ( @Qualifier("delayedQueue") Queue delayedQueue, @Qualifier("delayedExchange") CustomExchange delayedExchange ) { return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs(); }
生产者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @GetMapping("/sendDelayMsg/{message}/{delayTime}") public void sendDelayedMsg (@PathVariable String message,@PathVariable Integer delayedTime) { log.info("当前时间:{},发送一条时长{}毫秒信息给延迟队列delayedTime:{}" , new Date ().toString(),delayedTime,message); rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY,message, msg -> { msg.getMessageProperties().setDelay(delayedTime); return msg; } ); }
消费者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Component @Slf4j public class DelayedQueueConsumer { @RabbitListener(queues= DelayedQueueConfig.DELAYED_QUEUE_NAME) public void receiveDelayQueue (Message message) { String msg = new String (message.getBody()); log.info("当前时间:{},收到延迟队列的消息:{}" ,new Date ().toString(),msg); } }
http://127.0.0.1:8080/ttl/sendDelayMsg/come on bady1/20000
http://127.0.0.1:8080/ttl/sendDelayMsg/come on bady1/2000
第二个消息被先消费掉了,符合预期。
总结: 延迟队列在需要延时的处理的场景下非常有用,使用rabbitmq来实现延迟队列可以很好的利用rabbitmq的特性,如消息的可靠发送,消息可靠投递,死信队列来保证消息至少被消费一次已经未被正常处理的消息不会被丢弃 ,另外,通过rabbitMQ集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延迟队列不可用或者信息丢失
当然,延迟队列还有很多其他选择,比如利用java的delayQueue,利用Redis的zset,利用Quartz或者利用kafka的时间轮,这些方式各有各的特点,看需要的适用的场景。
高级部分: 发布确认高级 在生产环境下中由于一些不明的原因,导致rabbitmq重启,在rabbitmq 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复,这个时候,我们如何保证rabbitmq的消息的可靠投递呢?
高级就是,当生产者发送消息给交换机,但是交换机收不到的时候,我们将消息放到缓存中,当交换机可以收到消息的时候,我们就把消息从缓存中移除。一般会出现的问题是,交换机收不到消息,二是队列收不到消息。
代码架构部分
配置文件
在spring.rabbitmq.publisher-confirm-type=correlated
confirm
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class ConfirmConfig { public static final String CONFIRM_EXCHANGE_NAME= "confirm_exchange" ; public static final String CONFIRM_QUEUE_NAME = "confirm_queue" ; public static final String CONFIRM_ROUTING_KEY = "key1" ; @Bean("confirmExchange") public DirectExchange confirmExchange () { return new DirectExchange (CONFIRM_EXCHANGE_NAME); } @Bean("confirmQueue") public Queue confirmQueue () { return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } @Bean public Binding queueBindingExchange (@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange) { return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY); } }
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 @Slf4j @Component public class Consumer { @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME) public void receiveConfirmMessage (Message message) { String msg = new String (message.getBody()); log.info("接受到队列confirm.queue的消息: {} " ,msg); } }
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @RestController @RequestMapping("/confirm") @Slf4j public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate ; @GetMapping("/sendMessage/{message}") public void sendMessage (@PathVariable String message) { rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY,message); } }
回调接口
一旦消息没有被接受我们采取回调接口去确定。
NONE
禁用发布确认模式,是默认值
CORRELATED
发布消息成功到交换器后会触发回调方法
SIMPLE
经过测试有两种效果,其一效果和CORRELATED值一样会触发回调方法。
其二在发布消息成功后采用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法,等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法,如果返回fasle则会关闭channel,则接下来无法发送消息到broker.
1 2 3 4 5 6 7 spring.application.name =SpringBoot_RabbitMQ spring.rabbitmq.host =114.132.77.86 spring.rabbitmq.publisher-confirm-type =correlated spring.rabbitmq.port =5672 spring.rabbitmq.username =guest spring.rabbitmq.password =guest
定义一个回调方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Component @Slf4j public class MyCallBack implements RabbitTemplate .ConfirmCallback{ @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init () { rabbitTemplate.setConfirmCallback(this ); } @Override public void confirm (CorrelationData correlationData, boolean ack, String cause) { String id = correlationData.getId()!=null ? correlationData.getId() : "" ; if (ack){ log.info("交换机已经收到的ID为: {}的消息" ,id); }else { log.info("交换机还未收到ID为:{}的消息,由于原因:{}" ,id,cause); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @RestController @RequestMapping("/confirm") @Slf4j public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate ; @GetMapping("/sendMessage/{message}") public void sendMessage (@PathVariable String message) { CorrelationData correlationData1 = new CorrelationData ("1" ); rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData1); log.info("发送消息内容:{}" ,message + "key1" ); CorrelationData correlationData2 = new CorrelationData ("2" ); rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData2); log.info("发送消息内容:{}" ,message + "key2" ); } }
回退消息 在开启了生成者确认机制情况下,交换机接收到消息后,会直接将消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的,那么如何让无法被路由的消息帮我想办法处理一下?最简单的就是通知我一下,我好方便处理,通过设置mandatory参数就可以在当消息传递过程中不可达目的时将消息返回给生成者。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 @Component @Slf4j public class MyCallBack implements RabbitTemplate .ConfirmCallback,RabbitTemplate.ReturnCallback{ @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init () { rabbitTemplate.setConfirmCallback(this ); rabbitTemplate.setReturnCallback(this ); } @Override public void confirm (CorrelationData correlationData, boolean ack, String cause) { String id = correlationData.getId()!=null ? correlationData.getId() : "" ; if (ack){ log.info("交换机已经收到的ID为: {}的消息" ,id); }else { log.info("交换机还未收到ID为:{}的消息,由于原因:{}" ,id,cause); } } @Override public void returnedMessage (Message message, int replyCode, String replyText, String exchange, String routingKey) { log.error("消息:{},被交换机{} 退回了,退回原因是:{},路由Key:{}" , new String (message.getBody()),exchange,replyText,routingKey); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @RestController @RequestMapping("/confirm") @Slf4j public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate ; @GetMapping("/sendMessage/{message}") public void sendMessage (@PathVariable String message) { CorrelationData correlationData1 = new CorrelationData ("1" ); rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData1); log.info("发送消息内容:{}" ,message + "key1" ); CorrelationData correlationData2 = new CorrelationData ("2" ); rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY + "2" ,message + "key2" ,correlationData2); log.info("发送消息内容:{}" ,message + "key2" ); } }
回退处理
http://127.0.0.1:8080/confirm/sendMessage/你好
备份交换机 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 @Configuration public class ConfirmConfig { public static final String CONFIRM_EXCHANGE_NAME= "confirm_exchange" ; public static final String CONFIRM_QUEUE_NAME = "confirm_queue" ; public static final String CONFIRM_ROUTING_KEY = "key1" ; public static final String BACKUP_EXCHANGE_NAME = "backup_exchange" ; public static final String BACKUP_QUEUE_NAME = "backup_queue" ; public static final String WARNING_QUEUE_NAME = "warning_queue" ; @Bean("confirmExchange") public DirectExchange confirmExchange () { return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true ).withArgument("alternate-exchange" ,BACKUP_EXCHANGE_NAME).build(); } @Bean("confirmQueue") public Queue confirmQueue () { return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } @Bean public Binding queueBindingExchange (@Qualifier("confirmQueue") Queue confirmQueue, @Qualifier("confirmExchange") DirectExchange confirmExchange) { return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY); } @Bean("backupExchange") public FanoutExchange backupExchange () { return new FanoutExchange (BACKUP_EXCHANGE_NAME); } @Bean("backupQueue") public Queue backupQueue () { return QueueBuilder.durable(BACKUP_QUEUE_NAME).build(); } @Bean("warningQueue") public Queue warningQueue () { return QueueBuilder.durable(WARNING_QUEUE_NAME).build(); } @Bean public Binding queueBindingBackupExchange (@Qualifier("backupQueue") Queue backupQueue, @Qualifier("backupExchange") FanoutExchange backupExchange) { return BindingBuilder.bind(backupQueue).to(backupExchange); } @Bean public Binding queueBindingWarningExchange (@Qualifier("warningQueue") Queue warningQueue, @Qualifier("backupExchange") FanoutExchange backupExchange) { return BindingBuilder.bind(warningQueue).to(backupExchange); } }
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @Slf4j @Component public class WarningConsumer { @RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME) public void receiveWarningMsg (Message message) { String msg = new String (message.getBody()); log.info("报警发现不可路由消息: {}" ,msg); } }
幂等性 用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等
消息被重复消费
消费者在消费MQ中的消息时,MQ已把消息发送给消费者,消费者在给MQ返回ack时网络中断,故MQ未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。
解决思路
MQ消费者的幂等性的解决一般使用全局ID或者写个唯一标识比如时间戳或者UUID或者订单消费者消费MQ中的消息也可利用MQ的该id来判断,或者可按自己的规则生成一个全局唯一id,每次消费消息时用该id先判断该消息是否已消费过。
消费端的幂等性保障 在海量订单生成的业务高峰期,生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性,这就意味着我们的消息永远不会被消费多次,即使我们收到了一样的消息。业界主流的幂等性有两种操作:a.唯一ID+指纹码机制,利用数据库主键去重,b.利用redis的原子性去实现
唯一lD+指纹码机制 指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个id是否存在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;劣势就是在高并发时,如果是单个数据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。
Redis原子性 (重点选择) 利用redis执行setnx命令,天然具有幕等性。从而实现不重复消费。
优先级队列 在我们系统中有一个订单催付的场景,我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧,但是,tmall商家对我们来说,肯定是要分大客户和小客户的对吧,比如像苹果,小米这样大商家一年起码能给我们创造很大的利润,所以理应当然,他们的订单必须得到优先处理,而曾经我们的后端系统是使用 redis来存放的定时轮询,大家都知道 redis 只能用List做一个简简单单的消息队列,并不能实现一个优先级的场景,所以订单量大了后采用RabbitMQ进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级,否则就是默认优先级。
如何添加
a.控制台添加
b.队列中代码添加优先级
1 2 3 Map<String,Object> parame = new HashMap <>(); parame.put("x-max-prority" ,10 ); channel.queueDeclare("hello" ,true ,false ,false ,params);
c.消息中代码添加优先级
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 public class Product { public static final String QUEUE_NAME="hello" ; public static void main (String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory (); factory.setHost("www.littlehei.fun" ); factory.setUsername("guest" ); factory.setPassword("guest" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); Map<String,Object> map = new HashMap <>(); map.put("x-max-priority" ,10 ); channel.queueDeclare(QUEUE_NAME,true ,false ,false ,map); for (int i = 0 ; i < 10 ; i++) { String message = "info" + i; if (i == 5 ){ AMQP.BasicProperties properties = new AMQP .BasicProperties().builder().priority(5 ).build(); channel.basicPublish("" ,QUEUE_NAME,properties,message.getBytes()); }else { channel.basicPublish("" ,QUEUE_NAME,null ,message.getBytes()); } } System.out.println("消息发送成功" ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class Consumer { private static final String QUEUE_NAME="hello" ; public static void main (String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory (); factory.setHost("www.littlehei.fun" ); factory.setUsername("guest" ); factory.setPassword("guest" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); DeliverCallback deliverCallback = (consumerTag, message)->{ System.out.println(new String (message.getBody())); }; CancelCallback cancelCallback = consumerTag -> { System.out.println("消息消费被中断" ); }; channel.basicConsume(QUEUE_NAME,true ,deliverCallback,cancelCallback); } }
d.注意事项
要让队列实现优先级需要做的事情有如下事情:
队列需要设置优先级队列,消息需要设置消息的优先级,消费者需要等待消息已经发送到队列中才去消费,因为这样才有机会对消息进行排序
惰性队列 RabbitMQ从3.6.0版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、若机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。 默认情况下,当生产者将消息发送到RabbitMQ的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当RabbitMQ需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。虽然RabbitMQ的开发者们一直在升级相关的算法,但是效果始终不太理想,尤其是在消息量特别大的时候。+
当消息大量过来,我们短时间处理不过来,这个时候我们采用惰性队列了
队列具备两种模式:default 和lazy。默认的为default模式,在3.6.0之前的版本无需做任何变更。lazy模式即为惰性队列的模式,可以通过调用channel.queueDeclare方法的时候在参数中设置,也可以通过Policy的方式设置,如果一个队列同时使用这两种方式设置的话,那么Policy的方式具备更高的优先级。如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的。 在队列声明的时候可以通过“x-queue-mode”参数来设置队列的模式,取值为“default’和lazy”。 下面示例中演示了一个惰性队列的声明细节:
Map args =new HashMap();
args.put(”x-queue-mode”,”lazy”);
channel.queueDeclare(”myqueue”,false,false,false,args);
在发送1百万条消息,每条消息大概占1KB的情况下,普通队列占用内存是1.2GB,而惰性队列仅仅占用1.5MB 。
集群部分: clustering 搭建集群 如果是本地虚拟机的话,我们就将几台虚拟机进行克隆,造成集群环境。
然后选择一台服务器作为集群,其他两台服务器加入进去。
修改3台机器的主机名称
vim /etc/hostname
vim /etc/host
进入后更该就行
ip地址 node1
ip地址 node2
ip地址 node3
配置各个节点的host文件,让各个节点都能相互识别对方。
确保各个节点的cookie文件使用的是同一个值。
scp/var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmg/.erlang.cookie
scp/var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmg/.erlang.cookie
启动RabbitMQ服务,顺带启动Erlang虚拟机和rabbitMQ应用服务(在三台节点上分布执行命令)
rabbitmq-server-detached
在节点2执行
rabbitmqctl stop_app
(rabbitmqctl stop会将Erlang虚拟机关闭,rabbitmqct stop_app 只关闭RabbitMQ服务)
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app(只启动应用服务)
在节点3执行
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node2
rabbitmqctl start_app(只启动应用服务)
集群状态
rabbitmqctl cluster_status
需要重新设置用户角色
创建账号
rabbitmqctl add_user admin 123
设置用户角色
rabbitmqctl set_user_tags admin adminstractor
设置用户权限
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
此时如图,表示存在集群;
解除集群节点(node2,和node3分别执行)
1 2 3 4 5 6 7 8 9 10 rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl start_app rabbitmqctl cluster_status rabbitmqetl forget_cluster_node rabbit@node2(node1机器上执行)
镜像队列 如果RabbitMQ集群中只有一个Broker 节点,那么该节点的失效将导致整体服务的临时性不可用,并且也可能会导致消息的丢失。可以将所有消息都设置为持久化,并且对应队列的durable属性也设置为true,但是这样仍然无法避免由于缓存导致的问题:因为消息在发送之后和被写入磁盘井执行刷盘动作之间存在一个短暂却会产生问题的时间窗,通过publisherconfirm机制能够确保客户端知道哪些消息己经存入磁盘,尽管如此,一般不希望遇到因单点故障导致的服务不可用。 引入镜像队列(Miror Queue)的机制,可以将队列镜像到集群中的其他Broker节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。
搭建步骤
启动三台集群节点
随便找一个节点添加policy
单点无法连接
采用负载均衡,Haproxy + keepalive 实现高可用负载均衡。
FederationExchange(联邦交换机) (broker北京),(broker 深圳)彼此之间相距甚远,网络延迟是一个不得不面对的问题。有一个在北京的业务(Client北京)需要连接(broker北京),向其中的交换器exchangeA发送消息,此时的网络延迟很小,(Client北京)可以迅速将消息发送至exchangeA中,就算在开启了publisherconfirm机制或者事务机制的情况下,也可以迅速收到确认信息。此时又有个在深圳的业务(Client 深圳)需要向 exchangeA发送消息,那么(Client深圳)(broker北京)之间有很大的网络延迟,(Client 深圳)将发送消息至exchangeA会经历一定的延迟,尤其是在开启了publisherconfirm机制或者事务机制的情况下,(Client深圳)会等待很长的延迟时间来接收(broker北京)的确认信息,进而必然造成这条发送线程的性能降低,甚至造成一定程度上的阻塞。
将业务(Client深圳)部署到北京的机房可以解决这个问题,但是如果(Client深圳)调用的另些服务都部署在深圳,那么又会引发新的时延问题,总不见得将所有业务全部部署在一个机房,那么容灾又何以实现? 这里使用Federation 插件就可以很好地解决这个问题.
由于两个地区的服务器比较远,这个时候就存在一种服务延迟情况,我们的federation交换机就是解决这个问题的。
搭建步骤:
需要保证每台节点单独运行
在每台机器上开启federation相关插件
rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq federation_management
每台服务器上都装一下。
两个指令都需要安装。
原理图:(高山流水,只能从上游流向下游)。
注意的是,必须从创建好下游节点的交换机,才能够将消息从上游传到下游。上游的消息在下游不能接收,那么这个消息就无法传递到下游。
在downstean(node2)配置upstream(node1);
添加规则
联邦队列 Shovel 使用它的原因 Federation 具备的数据转发功能类似,Shovel够可靠、持续地从一个Broker中的队列(作为源端,即source)拉取数据并转发至另一个Broker中的交换器(作为目的端,即destination)。作为源端的队列和作为目的端的交换器可以同时位于同一个Broker,也可以位于不同的Broker上。Shovel可以翻译为“铲子” 是一种比较形象的比喻,这个”铲子”可以将消息从一方“铲子”另一方。Shovel行为就像优秀的客户端应用程序能够负责连接源和目的地、负责消息的读写及负责连接失败问题的处理。 搭建步骤 1.开启插件(需要的机器都开启)
rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management