RabbitMQ
简介
MQ(message queue)本质是个队列,FIFO 先入先出,只不过队列中存放的内容是 message而已,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务,使用了MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务
功能
分类
- ActiveMQ
- 优点:单机吞吐量万级,时效性ms级,可用性高,基于主从架构实现高可用性,消息可靠性较低的概率丢失数据
- 缺点:官方社区现在对ActiveMQ 5.x维护越来越少,高吞吐量场景较少使用
- Kafka
- 主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。大型公司建议可以选用,如果有日志采集功能, 肯定是首选kafka
- RocketMQ
- 天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。RoketMQ在稳定性上可能更值得信赖,如果你的业务有上述并发场景,建议可以选择RocketMQ
- RabbitMQ
- 结合erlang语言本身的并发优势,性能好时效性微秒级,社区活跃度也比较高,管理界面用起来十分方便,如果你的数据量没有那么大,中小型公司优先选择功能比较完备的RabbitMQ
四大核心
- 生产者:产生数据发送消息的程序
- 交换机:是RabbitMQ非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定
- 队列:是RabbitMQ内部使用的一种数据结构,尽管消息流经RabbitMQ和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式
- 消费者:消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者
1 2 3 4
| MQ |生产者| → |交换机 → 队列| → |消费者| 绑定关系 | ↘ 队列| → |消费者|
|
工作原理
1 2 3 4 5 6 7 8 9 10
| |Connection| |Broker(RabbitMQ)| |Connection| +----------+ +----------------+ +----------+ | | | ↗ Queue | | | Producer→| Channel | |Exchange | | Channel |→Consumer | | | ↘ Queue | | | | Channel |→| |→| Channel | | | | ↗ Queue | | | Producer→| Channel | |Exchange | | Channel |→Consumer | | | ↘ Queue | | |
|
- Broker:接收和分发消息的应用,RabbitMQ Server就是Message Broker
- Virtual host:出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时可以划分出多个vhost,每个用户在自己的vhost创建 exchange/queue等
- Connection:publisher/consumer和broker之间的TCP连接
- Channel:如果每一次访问RabbitMQ都建立一个Connection在消息量大的时候建立TCPConnection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel 进行通讯,AMQP method包含了channel id帮助客户端和message broker识别 channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCPconnection的开销
- Exchange:message到达broker的第一站,根据分发规则匹配查询表中的routing key,分发消息到queue中去。常用的类型有direct (point-to-point), topic (publish-subscribe) and fanout(multicast)
- Queue:消息最终被送到这里等待consumer取走
- Binding:exchange和queue之间的虚拟连接,binding中可以包含routing key,Binding信息被保存到exchange中的查询表中,用于message的分发依据
配置
- 启动服务:
systemctl start rabbitmq-server
- 查看服务状态:
/sbin/service rabbitmq-server status
- 停止服务(选择执行):
/sbin/service rabbitmq-server stop
- 开启web管理插件:
rabbitmq-plugins enable rabbitmq_management
- 用默认账号密码(guest)访问地址 http://mousse.cc:15672/出现权限问题
- 创建账号:
rabbitmqctl add_user admin 123
- 设置用户角色:
rabbitmqctl set_user_tags admin administrator
- 设置用户权限:
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
- 用户user_admin具有/vhost1这个virtual host中所有资源的配置、写、读权限
- 当前用户和角色:
rabbitmqctl list_users
HelloWorld
步骤
依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.13.1</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.11.0</version> </dependency> </dependencies>
|
消息生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public class Producer {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("mousse.cc"); factory.setUsername("admin"); factory.setPassword("root"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("消息发送完毕"); }
}
|
消息消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class Consumer {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("mousse.cc"); factory.setUsername("admin"); factory.setPassword("root"); Channel channel = factory.newConnection().createChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println(new String(message.getBody())); CancelCallback cancelCallback = consumerTag -> System.out.println("消息消费被中断");
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); }
}
|
Work Queues
工作队列又称任务队列,主要思想是避免立即执行资源密集型任务而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进 程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务
1 2 3 4 5
| ↗接收消息→ 工作线程 生产者 →大量消息→ 队列 →接收消息→ 工作线程 ↘接收消息→ 工作线程
|
轮训分发消息
抽取工具类
1 2 3 4 5 6 7 8 9 10 11 12
| public class RabbitMqUtils {
public static Channel getChannel() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("mousse.cc"); factory.setUsername("admin"); factory.setPassword("root"); return factory.newConnection().createChannel(); }
}
|
启动两个工作线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class Worker1 {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("接收到的消息: " + new String(message.getBody())); CancelCallback cancelCallback = consumerTag -> System.out.println(consumerTag + "消息者取消消费接口回调逻辑"); System.out.println("C1等待接收消息"); channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); }
}
|
启动一个发送线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class Task {
public static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("发送消息完成: " + message); } }
}
|
消息应答
消费者在接收到消息并且处理该消息之后,告诉RabbitMQ它已经处理了,RabbitMQ才可以把该消息删除,保证了消息在发送过程中不丢失
自动应答
- 消息发送后立即被认为已经传送成功
- 这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前消费者那边出现连接或者channel关闭,那么消息就丢失了
- 没有对传递的消息数量进行限制
- 这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用
手动应答
Multiple
- true:代表批量应答channel上未应答的消息
- 比如说channel上有传送tag的消息5,6,7,8当前tag是8,那么此时5-8的这些还未应答的消息都会被确认收到消息应答
- false:同上面相比只会应答tag=8的消息,5/6/7这三个消息依然不会被确认收到消息应答
消息自动重新入队
- 如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或TCP连接丢失)导致消息 未发送ACK确认,RabbitMQ将了解到消息未完全处理并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样即使某个消费者偶尔死亡也可以确保不会丢失任何消息
持久化
默认情况下RabbitMQ退出或由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化,这时候即使重启rabbitmq队列也依然存在
队列持久化
1 2
| boolean durable = true; channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
|
- 如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列,不然就会出现错误
消息实现持久化
1 2 3
| channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
|
不公平分发
- 轮训分发在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者1处理任务的速度非常快,而另外一个消费者2处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,RabbitMQ并不知道这种情况它依然很公平的进行分发
- 为了避免这种情况可以设置参数:channel.basicQos(1);
预取值
- 限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题
- 通过使用basicQos方法设置“预取计数”值来完成的,该值定义通道上允许的未确认消息的最大数量
- 虽然自动应答传输消息速率是最佳的,但在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的RAM消耗
发布确认
原理
生产者将信道设置成confirm模式:一旦信道进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从1开始)一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号,此外broker也可以设置 basic.ack的multiple域表示到这个序列号之前的所有消息都已经得到了处理
confirm模式最大的好处在于他是异步的:一旦发布一条消息生产者应用程序就可以在等信 道返回确认的同时继续发送下一条消息,当消息最终得到确认之后生产者应用便可以通过回调方法来处理该确认消息
如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条 nack消息,生产者应用程序同样可以在回调方法中处理该nack消息
策略
发布确认默认是没有开启的,如果要开启需要调用方法confirmSelect,每当你要想使用发布 确认,都需要在channel上调用该方法
单个确认发布
- 同步等待确认,简单,但吞吐量非常有限
- 一种同步确认发布的方式:发布一个消息之后只有它被确认发布,后续的消息才能继续发布
- 个最大的缺点:发布速度特别的慢
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public static void publishMessageIndividually() throws Exception { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, false, false, false, null); channel.confirmSelect(); long start = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { byte[] message = String.valueOf(i).getBytes(StandardCharsets.UTF_8); channel.basicPublish("", queueName, null, message); if (channel.waitForConfirms()) { System.out.println("消息发送成功: " + i); } } long finish = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "条单独确认消息总用时: " + (finish - start) + "ms"); }
|
批量确认发布
- 批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是那条
消息出现了问题
- 先发布一批消息然后一起确认可以极大地提高吞吐量
- 缺点:当发生故障导致发布出现问题时不知道是哪个消息出现问题,必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息
- 这种方案仍然是同步的,也一样阻塞消息的发布
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public static void publishMessageBatch() throws Exception { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, false, false, false, null); channel.confirmSelect(); long start = System.currentTimeMillis(); int batchSize = 100; for (int i = 0; i < MESSAGE_COUNT; i++) { byte[] message = String.valueOf(i).getBytes(StandardCharsets.UTF_8); channel.basicPublish("", queueName, null, message); if ((i + 1) % batchSize == 0) { if (channel.waitForConfirms()) { System.out.println("消息发送成功: " + i); } } } long finish = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "条批量确认消息总用时: " + (finish - start) + "ms"); }
|
异步确认发布
- 最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些
- 性价比最高,无论是可靠性还是效率都高, 他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public static void publishMessageAsync() throws Exception { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, false, false, false, null); channel.confirmSelect(); long start = System.currentTimeMillis(); ConfirmCallback ackCallback = (deliveryTag, multiple) -> System.out.println("已确认的消息: " + deliveryTag); ConfirmCallback nackCallback = (deliveryTag, multiple) -> System.out.println("未确认的消息: " + deliveryTag);
channel.addConfirmListener(ackCallback, nackCallback); for (int i = 0; i < MESSAGE_COUNT; i++) { byte[] message = String.valueOf(i).getBytes(StandardCharsets.UTF_8); channel.basicPublish("", queueName, null, message); } long finish = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "条异步确认消息总用时: " + (finish - start) + "ms"); }
|
处理异步未确认消息
- 最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用ConcurrentLinkedQueue这个队列在confirm callbacks与发布线程之间进行消息的传递
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| public static void publishMessageAsync() throws Exception { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, false, false, false, null); channel.confirmSelect();
ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>(); ConfirmCallback ackCallback = (deliveryTag, multiple) -> { if (multiple) { ConcurrentNavigableMap<Long, String> confirmedMap = outstandingConfirms.headMap(deliveryTag); confirmedMap.clear(); } else { outstandingConfirms.remove(deliveryTag); } System.out.println("已确认的消息: " + deliveryTag); }; ConfirmCallback nackCallback = (deliveryTag, multiple) -> { String message = outstandingConfirms.get(deliveryTag); System.out.println("未确认的消息: " + deliveryTag + "\tTag: " + message); };
channel.addConfirmListener(ackCallback, nackCallback); long start = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String message = String.valueOf(i); channel.basicPublish("", queueName, null, message.getBytes(StandardCharsets.UTF_8)); outstandingConfirms.put(channel.getNextPublishSeqNo(), message); } long finish = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "条异步确认消息总用时: " + (finish - start) + "ms"); }
|
交换机
消息传达给多个消费者,这种模式称为“发布/订阅”
1 2 3 4 5 6 7 8 9
| ↗ 消费者C1 生产者 → 队列 → 消费者C2 ↘ 消费者C3
↗绑定→ 队列 → 消费者C1 生产者 → 交换机 →绑定→ 队列 → 消费者C2 ↘绑定→ 队列 → 消费者C3
|
Exchange
生产者生产的消息从不会直接发送到队列,生产者只能将消息发送到交换机(exchange)
交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列
类型
- 直接:direct
- 主题:topic
- 标题:headers
- 扇出:fanout
无名exchange
- 空字符串表示默认或无名称交换机
- 消息能路由发送到队列中其实是由routingKey(bindingkey)绑定key指定的,如果它存在的话
临时队列
不带有持久化的队列
可以创建一个具有随机名称的队列或能让服务器为我们选择一个随机队列名称那就更好了,一旦断开了消费者的连接队列将被自动删除
1 2
| String queueName = channel.queueDeclare().getQueue();
|
绑定
binding是exchange和queue之间的桥梁,它告诉我们exchange和那个队列进行了绑定关系
Fanout
将接收到的所有消息广播到它知道的所有队列中。系统中默认有的exchange类型
routingKey相同
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("生产者发出消息: " + message); } }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class ReceiveLogs1 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println("C1等待接收消息"); DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("接收到的消息: " + new String(message.getBody(), StandardCharsets.UTF_8)); channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {}); }
}
|
Direct
队列只对它绑定的交换机的消息感兴趣,绑定之后的 意义由其交换类型决定
工作方式:消息只去到它绑定的routingKey队列中去
routingKey不相同
多重绑定
- exchange的绑定类型是direct,但是它绑定的多个队列的key都相同
- 表现的就和fanout有类似
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public class DirectLogs {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String message = scanner.next(); if (message.contains("info")) { channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes(StandardCharsets.UTF_8)); } if (message.contains("warning")) { channel.basicPublish(EXCHANGE_NAME, "warning", null, message.getBytes(StandardCharsets.UTF_8)); } if (message.contains("error")) { channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes(StandardCharsets.UTF_8)); } System.out.println("生产者发出消息: " + message); } }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class ReceiveLogsDirect1 {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception { String queueName = "console"; Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "info"); channel.queueBind(queueName, EXCHANGE_NAME, "warning"); DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("接收到的消息" + new String(message.getBody(), StandardCharsets.UTF_8)); System.out.println("C1准备接收消息"); channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {}); }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class ReceiveLogsDirect2 {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws Exception { String queueName = "disk"; Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare(queueName, false, false, false, null); channel.queueBind(queueName, EXCHANGE_NAME, "error"); DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("接收到的消息" + new String(message.getBody(), StandardCharsets.UTF_8)); System.out.println("C2准备接收消息"); channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {}); }
}
|
Topic
routing_key不能随意写,它必须是一个单词列表,以点号分隔开,比如”stock.usd.nyse”,”nyse.vmw”,”quick.orange.rabbit”,单词列表最多不能超过 255字节
1 2 3 4 5 6 7 8 9 10 11 12 13
| ↗*.orange.*→ Q1 → C1 Producer → Exchange →*.*.rabbit↘ ↘lazy.#→→→→→→ Q2 → C2
|
当一个队列绑定键是#,那么这个队列将接收所有数据,类似fanout
若队列绑定键当中没有#和*出现,那么该队列绑定类型就是direct
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); Map<String, String> bindingKeyMap = new HashMap<>(); bindingKeyMap.put("quick.orange.rabbit", "被队列Q1Q2接收到"); bindingKeyMap.put("lazy.orange.elephant", "被队列Q1Q2接收到"); bindingKeyMap.put("quick.orange.fox", "被队列Q1接收到"); bindingKeyMap.put("lazy.brown.fox", "被队列Q2接收到"); bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列Q2接收一次"); bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃"); bindingKeyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃"); bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配Q2"); for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) { String routingKey = bindingKeyEntry.getKey(); String message = bindingKeyEntry.getValue(); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("生产者发送消息: " + message + "\troutingKey: " + routingKey); } }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class ReceiveLogsTopic1 {
private static final String EXCHANGE_NAME = "topic_logs"; private static final String QUEUE_NAME = "Q1";
public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.orange.*"); DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println( "接收到的消息: " + new String(message.getBody(), StandardCharsets.UTF_8) + "\t接收队列: " + QUEUE_NAME + "\t绑定键: " + message.getEnvelope().getRoutingKey() ); System.out.println("C1准备接收消息"); channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class ReceiveLogsTopic2 {
private static final String EXCHANGE_NAME = "topic_logs"; private static final String QUEUE_NAME = "Q2";
public static void main(String[] args) throws Exception { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#"); DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println( "接收到的消息: " + new String(message.getBody(), StandardCharsets.UTF_8) + "\t接收队列: " + QUEUE_NAME + "\t绑定键: " + message.getEnvelope().getRoutingKey() ); System.out.println("C2准备接收消息"); channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); }
}
|
死信队列
某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理就变成了死信,有死信自然就有了死信队列
为了保证订单业务的消息数据不丢失需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中
- 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
来源
1 2 3 4 5 6
| Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
arguments.put("x-dead-letter-routing-key", routing_key); channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
|
消息TTL过期
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class Producer {
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args) throws Exception { Channel channel = RabbitUtils.getChannel(); channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); for (int i = 0; i < 10; i++) { String message = "info" + (i + 1); AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build(); channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes(StandardCharsets.UTF_8)); System.out.println("发送消息: " + message); } }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| public class Consumer1 {
private static final String NORMAL_EXCHANGE = "normal_exchange"; private static final String DEAD_EXCHANGE = "dead_exchange"; private static final String NORMAL_QUEUE = "normal_queue"; private static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception { Channel channel = RabbitUtils.getChannel(); channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); channel.queueDeclare(DEAD_QUEUE, false, false, false, null); channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi"); Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); arguments.put("x-dead-letter-routing-key", "lisi"); channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments); channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan"); DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("C1接收消息: " + new String(message.getBody(), StandardCharsets.UTF_8)); System.out.println("C1等待接收消息"); channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag -> { }); }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class Consumer2 {
private static final String DEAD_EXCHANGE = "dead_exchange"; private static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception { Channel channel = RabbitUtils.getChannel(); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); channel.queueDeclare(DEAD_QUEUE, false, false, false, null); channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi"); DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("C2接收消息: " + new String(message.getBody(), StandardCharsets.UTF_8)); System.out.println("C2等待接收消息"); channel.basicConsume(DEAD_QUEUE, true, deliverCallback, consumerTag -> { }); } }
|
队列达到最大长度
1 2
| arguments.put("x-max-length", 6);
|
消息被拒绝
- basic.reject或basic.nack并且requeue=fals
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| DeliverCallback deliverCallback = (consumerTag, message) -> { String messageBody = new String(message.getBody(), StandardCharsets.UTF_8); if (Integer.parseInt(messageBody.substring(4)) % 2 == 0) { channel.basicReject(message.getEnvelope().getDeliveryTag(), false); } else { System.out.println("C1接收消息: " + messageBody); channel.basicAck(message.getEnvelope().getDeliveryTag(), false); } }; System.out.println("C1等待接收消息");
channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> { });
|
延迟队列
延时队列就是用来存放需要在指定时间被处理的使用场景元素的队列
使用场景:订单在十分钟之内未支付则自动取消
TTL
方式1:针对每条消息设置 TTL
1
| AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
|
方式2:在创建队列的时候设置
1
| arguments.put("x-message-ttl", 100000);
|
SpringBoot
依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.5.4</version> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>2.5.4</version> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency>
<dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.78</version> </dependency>
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.20</version> <scope>provided</scope> </dependency>
<dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>3.0.0</version> </dependency>
<dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>3.0.0</version> </dependency>
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <version>2.3.10</version> <scope>test</scope> </dependency> </dependencies>
|
配置文件
1 2 3 4
| spring.rabbitmq.host=mousse.cc spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=root
|
Swagger配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Configuration @EnableSwagger2 public class SwaggerConfig {
@Bean public Docket webApiConfig() { return new Docket(DocumentationType.SWAGGER_2).groupName("webApi") .apiInfo(webApiInfo()) .select() .build(); }
private ApiInfo webApiInfo() { return new ApiInfoBuilder().title("rabbitmq 接口文档") .description("本文档描述了 rabbitmq 微服务接口定义").version("1.0") .contact(new Contact("Tom", "mousse.cc", "tom@mousse.cc")) .build(); } }
|
死信实现
缺陷
- RabbitMQ只会检查第一个消息是否过期
- 如果过期则丢到死信队列中一个消息的延时时长很长而第二个消息的延时时长很短,第二个消息并不会优先得到执行
- 需要通过插件弥补
队列 TTL
创建两个队列QA和QB,两者队列TTL分别设置为10s和40s,然后在创建一个交换机X和死信交换机 Y,它们的类型都是direct
1 2 3
| ↗XA→ QA(10s) →YD↘ P → X Y →YD→ QD → C ↘XB→ QB(40s) →YD↗
|
配置文件类代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Configuration @EnableSwagger2 public class SwaggerConfig {
@Bean public Docket webApiConfig() { return new Docket(DocumentationType.SWAGGER_2).groupName("webApi") .apiInfo(webApiInfo()) .select() .build(); }
private ApiInfo webApiInfo() { return new ApiInfoBuilder().title("rabbitmq 接口文档") .description("本文档描述了 rabbitmq 微服务接口定义").version("1.0") .contact(new Contact("Tom", "mousse.cc", "tom@mousse.cc")) .build(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71
| @Configuration public class TtlQueueConfig {
public static final String X_EXCHANGE = "X"; public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; public static final String DEAD_LETTER_QUEUE = "QD";
@Bean("xExchange") public DirectExchange xExchange() { return new DirectExchange(X_EXCHANGE); }
@Bean("yExchange") public DirectExchange yExchange() { return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); }
@Bean("queueA") public Queue queueA() { Map<String, Object> arguments = new HashMap<>(3); arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); arguments.put("x-dead-letter-routing-key", "YD"); arguments.put("x-message-ttl", 10000); return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build(); }
@Bean("queueB") public Queue queueB() { Map<String, Object> arguments = new HashMap<>(3); arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); arguments.put("x-dead-letter-routing-key", "YD"); arguments.put("x-message-ttl", 40000); return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build(); }
@Bean("queueD") public Queue queueD() { return QueueBuilder.durable(DEAD_LETTER_QUEUE).build(); }
@Bean public Binding queueABindingX() { return BindingBuilder.bind(queueA()).to(xExchange()).with("XA"); }
@Bean public Binding queueBBindingX() { return BindingBuilder.bind(queueB()).to(xExchange()).with("XB"); }
@Bean public Binding queueDBindingY() { return BindingBuilder.bind(queueD()).to(yExchange()).with("YD"); }
}
|
消息生产者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Slf4j @RestController @RequestMapping("/ttl")
public class SendMsgController {
@Autowired private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}") public void sendMessage(@PathVariable String message) { log.info("{}: 发送一条信息给两个TTL队列\"{}\"", new Date(), message); rabbitTemplate.convertAndSend(X_EXCHANGE, "XA", "消息来自TTL=10s的队列: " + message); rabbitTemplate.convertAndSend(X_EXCHANGE, "XB", "消息来自TTL=40s的队列: " + message); }
}
|
消息消费者代码
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Slf4j @Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = DEAD_LETTER_QUEUE) public void receiveD(Message message) throws Exception { String msgBody = new String(message.getBody(), StandardCharsets.UTF_8); log.info("{}: 收到死信队列的消息\"{}\"", new Date(), msgBody); }
}
|
延时队列优化
1 2 3 4
| ↗XA→ QA(10s) →YD↘ P → X →XB→ QB(40s) →YD→ Y →YD→ QD → C ↘XC→ QC(XXs) →YD↗
|
缺陷
- RabbitMQ只会检查第一个消息是否过期
- 如果过期则丢到死信队列中一个消息的延时时长很长而第二个消息的延时时长很短,第二个消息并不会优先得到执行
- 需要通过插件弥补
配置文件类代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public static final String QUEUE_C = "QC";
@Bean("queueC") public Queue queueC() { Map<String, Object> arguments = new HashMap<>(3); arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); arguments.put("x-dead-letter-routing-key", "YD"); return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build(); }
@Bean public Binding queueCBindingX() { return BindingBuilder.bind(queueC()).to(yExchange()).with("XC"); }
|
消息生产者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @GetMapping("/sendExpMsg/{ttl}/{msg}") public void sendMessage(@PathVariable String ttl, @PathVariable String msg) { log.info("{}: 发送一条信息给时长为{}ms的TTL队列\"{}\"", new Date(), ttl, msg); rabbitTemplate.convertAndSend( X_EXCHANGE, "XC", "消息来自TTL=" + ttl + "ms的队列: " + msg, msgVal -> { msgVal.getMessageProperties().setExpiration(ttl); return msgVal; }); }
|
插件实现
在官网上https://www.rabbitmq.com/community-plugins.html下载 rabbitmq_delayed_message_exchange插件,解压放置到RabbitMQ的插件目录
进入RabbitMQ的安装目录下的plgins目录,执行下面命令让该插件生效,然后重启 RabbitMQ
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins rabbitmq-plugins enable rabbitmq_delayed_message_exchange
1 2 3 4 5 6 7 8 9
| 生产者 → 交换机 → 队列[TTL] ↓ 死信交换机 ↓ 死信队列 → 消费者
生产者 → 交换机[延迟] → 队列 → 消费者
|
配置文件类代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| @Configuration
public class DelayedQueueConfig {
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange"; public static final String DELAYED_QUEUE_NAME = "delayed.queue"; public static final String DELAYED_ROUTING_KEY = "delayed.routing_key";
@Bean public CustomExchange delayedExchange() { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
}
@Bean public Queue delayedQueue() { return QueueBuilder.durable(DELAYED_QUEUE_NAME).build(); }
@Bean public Binding delayedQueueBindingDelayedExchange() { return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(DELAYED_ROUTING_KEY).noargs(); }
}
|
消息生产者代码
1 2 3 4 5 6 7 8 9 10 11 12 13
| @GetMapping("/sendDelayMsg/{msg}/{delay}") public void sendMessage(@PathVariable(value = "msg") String msg, @PathVariable(value = "delay") Integer delay) { log.info("{}: 发送一条信息给时长为{}ms的delay队列, 消息为\"{}\"", new Date(), delay, msg); rabbitTemplate.convertAndSend( DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY, "消息来自delay=" + delay + "ms的队列, 消息为\"" + msg + "\"", msgVal -> { msgVal.getMessageProperties().setDelay(delay); return msgVal; }); }
|
消息消费者代码
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Slf4j @Component
public class DelayedQueueConsumer {
@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME) public void receiveDelayQueue(Message msg) { String msgBody = new String(msg.getBody(), StandardCharsets.UTF_8); log.info("{}: 收到延迟队列的消息\"{}\"", new Date(), msgBody); }
}
|
总结
延时队列在需要延时处理的场景下非常有用,使用RabbitMQ来实现延时队列可以很好的利用 RabbitMQ的特性,如消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。通过RabbitMQ集群的特性可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失
延时队列还有很多其它选择,比如Java的DelayQueue,Redis的zset,Quartz或利用kafka的时间轮,这些方式各有特点
发布确认高级
在生产环境中由于一些不明原因导致RabbitMQ重启,在重启期间生产者消息投递失败导致消息丢失,需要手动处理和恢复
SpringBoot
配置文件
- 在配置文件当中需要添加:
spring.rabbitmq.publisher-confirm-type=correlated
- NONE:禁用发布确认模式,默认值
- CORRELATED:发布消息成功到交换器后会触发回调方法
- SIMPLE:
- 效果1:和CORRELATED值一样会触发回调方法
- 效果2:在发布消息成功后使用rabbitTemplate调用waitForConfirms或 waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑
- 要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker
1 2 3 4 5
| spring.rabbitmq.host=mousse.cc spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=root spring.rabbitmq.publisher-confirm-type=correlated
|
添加配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; public static final String CONFIRM_QUEUE_NAME = "confirm.queue"; public static final String CONFIRM_ROUTING_KEY = "key1";
@Bean public Exchange confirmExchange() { return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).build(); }
@Bean public Queue confirmQueue() { return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); }
@Bean public Binding confirmQueueBindingConfirmExchange() { return BindingBuilder.bind(confirmQueue()).to(confirmExchange()).with(CONFIRM_ROUTING_KEY).noargs(); }
}
|
消息生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Slf4j @RestController @RequestMapping("/confirm")
public class ProduceController {
@Autowired private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{msg}") public void sendMsg(@PathVariable String msg) { log.info("{}: 发送一条信息: {}", new Date(), msg); rabbitTemplate.convertAndSend( ConfirmConfig.CONFIRM_EXCHANGE_NAME, ConfirmConfig.CONFIRM_ROUTING_KEY, msg, new CorrelationData("1") ); }
}
|
回调接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @Slf4j
@Component public class MyCallBack implements RabbitTemplate.ConfirmCallback {
@Autowired private RabbitTemplate rabbitTemplate;
@PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); }
@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id = correlationData != null ? correlationData.getId() : ""; if (ack) { log.info("{}: 交换机成功接收到ID为{}的消息", new Date(), id); } else { log.info("{}: 交换机还未接收到ID为{}的消息, 原因: {}", new Date(), id, cause); } }
}
|
消息消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Slf4j @Component public class ConfirmConsumer {
@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME) public void receiveConfirm(Message msg) { log.info( "{}: 收到队列" + msg.getMessageProperties().getConsumerQueue() + "的消息: {}", new Date(), new String(msg.getBody(), StandardCharsets.UTF_8) ); }
}
|
回退消息
Mandatory
- 在仅开启了生产者确认机制的情况下交换机接收到消息后会直接给消息生产者发送确认消息,如果发现该消息不可路由那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的
- 通过设置mandatory参数可以在当消息传递过程中不可达目的地时将消息返回给生产者
1 2 3 4 5 6
| spring.rabbitmq.host=mousse.cc spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=root spring.rabbitmq.publisher-confirm-type=correlated spring.rabbitmq.publisher-returns=true
|
回调接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| @Slf4j
@Component public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
@Autowired private RabbitTemplate rabbitTemplate;
@PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnsCallback(this); }
@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String id = correlationData != null ? correlationData.getId() : ""; String exchange = correlationData != null && correlationData.getReturned() != null ? correlationData.getReturned().getExchange() : ""; if (ack) { log.info("{}: 交换机{}成功接收到ID为{}的消息", new Date(), exchange, id); } else { log.info("{}: 交换机{}还未接收到ID为{}的消息, 原因: {}", new Date(), exchange, id, cause); } }
@Override public void returnedMessage(ReturnedMessage returned) { log.info( "{}: 消息{}被交换机{}退回, 原因: {}, 路由Key: {}", new Date(), new String(returned.getMessage().getBody(), StandardCharsets.UTF_8), returned.getExchange(), returned.getReplyText(), returned.getRoutingKey() ); } }
|
备份交换机
1 2 3 4 5 6 7 8
| type=direct producer → confirm.exchange → confirm.queue → confirm consumer ↓ 无法投送的消息将发送给备份交换机 ↓ ↓ ↗ backup.queue → backup.exchange type=fanout ↘ warning.queue → warning consumer
|
修改配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| @Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange"; public static final String BACKUP_EXCHANGE_NAME = "backup.exchange"; public static final String CONFIRM_QUEUE_NAME = "confirm.queue"; public static final String BACKUP_QUEUE_NAME = "backup.queue"; public static final String WARNING_QUEUE_NAME = "warning.queue"; public static final String CONFIRM_ROUTING_KEY = "key1";
@Bean public DirectExchange confirmExchange() { return ExchangeBuilder .directExchange(CONFIRM_EXCHANGE_NAME) .withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME) .build(); }
@Bean public FanoutExchange backupExchange() { return ExchangeBuilder.fanoutExchange(BACKUP_EXCHANGE_NAME).build(); }
@Bean public Queue confirmQueue() { return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); }
@Bean public Queue backupQueue() { return QueueBuilder.durable(BACKUP_QUEUE_NAME).build(); }
@Bean public Queue warningQueue() { return QueueBuilder.durable(WARNING_QUEUE_NAME).build(); }
@Bean public Binding confirmQueueBindingConfirmExchange() { return BindingBuilder.bind(confirmQueue()).to(confirmExchange()).with(CONFIRM_ROUTING_KEY); }
@Bean public Binding backupQueueBindingBackupExchange() { return BindingBuilder.bind(backupQueue()).to(backupExchange()); }
@Bean public Binding warningQueueBindingBackupExchange() { return BindingBuilder.bind(warningQueue()).to(backupExchange());
}
}
|
报警消费者
1 2 3 4 5 6 7 8 9 10 11
| @Slf4j @Component
public class WarningConsumer {
@RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME) public void receiveWarningMsg(Message msg) { log.info("{}: 发现不可路由消息: {}", new Date(), new String(msg.getBody(), StandardCharsets.UTF_8)); }
}
|
注意事项
重新启动项目的时候需要把原来的confirm.exchange删除因为我们修改了其绑定属性,不然报错
mandatory参数与备份交换机可以一起使用的时候,备份交换机优先级高
其他知识点
幂等性
用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。在单应用系统中只需要把数据操作放入事务中即可,发生错误立即回滚,但是在响应客户端的时候也有可能出现网络中断或者异常等等
消息重复消费
- 消费者在消费MQ中的消息时MQ已把消息发送给消费者,消费者在给MQ返回ack时网络中断, 故MQ未收到确认信息,该条消息会重新发给其他的消费者或者在网络重连后再次发送给该消费者,实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息
消费端的幂等性保障
- 在海量订单生成的业务高峰期生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性:即使收到了一样的消息,消息也永远不会被消费多次
- 业界主流的幂等性有两种操作:
- 唯一ID+指纹码机制,利用数据库主键去重
- 利用redis的原子性去实现
唯一ID+指纹码机制
- 指纹码:一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个id是否存
- 在数据库中优势就是实现简单就一个拼接,然后查询判断是否重复,劣势就是在高并发时如果是单个数据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,不是最推荐
Redis原子性
优先级队列
0-255,越大优先级越高,(建议0-10)
使用场景:订单催付
- redis只能用List做一个简简单单的消息队列,并不能实现一个优先级的场景,所以订单量大了后采用RabbitMQ进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级, 否则就是默认优先级
添加步骤
队列中代码添加优先级
1 2 3
| Map<String, Object> arguments = new HashMap<>();
arguments.put("x-max-priority", 10);
|
消息中代码添加优先级
1
| AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(randomNumber).build();
|
惰性队列
惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时惰性队列就很有必要了
默认情况下生产者将消息发送到RabbitMQ的时,队列中的消息会尽可能的存储在内存之中, 这样可以更加快速的将消息发送给消费者。即使是持久化的消息在被写入磁盘的同时也会在内存中驻留一份备份。当RabbitMQ需要释放内存的时会将内存中的消息换页至磁盘中,这个操作会耗费较长时间也会阻塞队列的操作进而无法接收新的消息
在发送1百万条消息,每条消息大概占1KB的情况下普通队列占用内存1.2GB,惰性队列仅 占用1.5MB
两种模式
- 在队列声明的时候可以通过
x-queue-mode
参数来设置队列的模式
- default:默认,在3.6.0之前的版本无需做任何变更
- lazy:惰性队列模式,可以通过调用channel.queueDeclare方法的时候在参数中设置,也可以通过Policy的方式设置
- 如果一个队列同时使用这两种方式设置,那么Policy的方式具备更高的优先级
- 如果要通过声明的方式改变已有队列的模式的话只能先删除队列再重新声明一个新的
集群
步骤
修改机器的主机名称:vim /etc/hostname
配置各个节点的hosts文件,让各个节点都能互相识别对方:vim /etc/hosts
1 2 3
| 10.211.55.74 node1 10.211.55.75 node2 10.211.55.76 node3
|
以确保各个节点的cookie文件使用的是同一个值
- 在node1上执行远程操作命令:
scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie
scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie
启动RabbitMQ服务,顺带启动Erlang虚拟机和RbbitMQ应用服务(在三台节点上分别执行):rabbitmq-server -detached
在节点2执行:
rabbitmqctl stop_app
rabbitmqctl stop
会将Erlang虚拟机关闭
rabbitmqctl stop_app
只关闭RabbitMQ服务
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
在节点3执行:
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node2 rabbitmqctl start_app
集群状态:rabbitmqctl cluster_status
需要重新设置用户
- 创建账号:
rabbitmqctl add_user admin 123
- 设置用户角色:
rabbitmqctl set_user_tags admin administrator
- 设置用户权限:
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
解除集群节点(node2和node3机器分别执行):
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
rabbitmqctl cluster_status
rabbitmqctl forget_cluster_node rabbit@node2
(node1机器上执行)
镜像队列
引入镜像队列(Mirror Queue)的机制可以将队列镜像到集群中的其他Broker节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性
搭建
- 在节点添加policy
- Name:策略名,没有意义只有语意
- Pattern:正则表达式,匹配此规则的交换机/队列名才会使用策略
Haproxy+Keepalive
实现高可用负载均衡
1 2 3 4 5 6 7
| VIP ↙ ↘ HAProxy node11 node21 负载均衡 Keepalived ↓ ↓ +-+---+---+-+ ↓ ↓ ↓ RabbitMQ node1 node2 node3
|
HAProxy
- 提供高可用性、负载均衡及基于TCPHTTP应用的代理,支持虚拟主机,它是免费、快速并且可靠的一种解决方案
- 搭建步骤:
- 下载haproxy(在node1和node2):
yum -y install haproxy
- 修改node1和node2的haproxy.cfg
vim /etc/haproxy/haproxy.cfg
- 需要修改红色 IP 为当前机器 IP
- 在两台节点启动haproxy
haproxy -f /etc/haproxy/haproxy.cfg
ps -ef | grep haproxy
- 访问地址:http://10.211.55.71:8888/stats
Keepalived
- 实现双机(主备)热备
- 如果前面配置的HAProxy主机突然宕机或者网卡失效,那么虽然RbbitMQ集群没有任何故障但是对于外界的客户端来说所有的连接都会被断开结果将是灾难性的
- 为了确保负载均衡服务的可靠性同样显得十分重要,这里就要引入Keepalived它能够通过自身健康检查、资源接管功能做高可用(双机热备),实现故障转移
- 搭建步骤:
- 下载keepalived:
yum -y install keepalived
- 节点node1配置文件:
vim /etc/keepalived/keepalived.conf
- 把资料里面的keepalived.conf修改之后替换
- 节点node2配置文件
- 需要修改global_defs的router_id,如nodeB
- 其次要修改vrrp_instance_VI中state为”BACKUP”
- 最后要将priority设置为小于100的值
- 添加haproxy_chk.sh
- 为了防止HAProxy服务挂掉之后Keepalived还在正常工作而没有切换到Backup 上,所以这里需要编写一个脚本来检测HAProxy务的状态,当HAProxy服务挂掉之后该脚本会自动重启HAProxy的服务,如果不成功则关闭Keepalived服务,这样便可以切换到Backup继续工作
vim /etc/keepalived/haproxy_chk.sh
(可以直接上传文件)
- 修改权限:
chmod 777 /etc/keepalived/haproxy_chk.sh
- 启动keepalive命令(node1和node2启动):
systemctl start keepalived
- 观察Keepalived的日志:
tail -f /var/log/messages -n 200
- 观察最新添加的:
vip ip add show
- node1 模拟keepalived关闭状态:
systemctl stop keepalived
- 使用vip地址来访问rabbitmq集群
Federation Exchange
(broker北京)和(broker深圳)彼此之间相距甚远,网络延迟是一个不得不面对的问题。有一个在北京的业务(Client北京)需要连接(broker北京),向其中的交换器exchangeA发送消息,此时的网络延迟很小(Client北京)可以迅速将消息发送至exchangeA中,就算在开启了publisherconfirm机制或者事务机制的情况下,也可以迅速收到确认信息。此时又有个在深圳的业务(Client深圳)需要向exchangeA发送消息,那么(Client深圳)(broker北京)之间有很大的网络延迟(Client深圳)将发送消息至exchangeA会经历一定的延迟,尤其是在开启了publisherconfirm机制或者事务机制的情况下(Client深圳)会等待很长的延迟时间来接收(broker北京)的确认信息进而必然造成这条发送线程的性能降低,甚至造成一定程度上的 阻塞
将业务(Client深圳)部署到北京的机房可以解决这个问题,但是如果(Client深圳)调用的另些服务都部署在深圳,那么又会引发新的时延问题,这里使用Federation插件就可以很好地解决这个问题
Federation Queue
联邦队列可以在多个Broker节点或者集群之间为单个队列提供均衡负载的功能。一个联邦队列可以 连接一个或者多个上游队列(upstream queue)并从这些上游队列中获取消息以满足本地消费者消费消息的需求
Shovel
Federation具备的数据转发功能类似,Shovel够可靠、持续地从一个Broker中的队列(作为源端,即source)拉取数据并转发至另一个Broker中的交换器(作为目的端,即destination)作为源端的队列和作为目的端的交换器可以同时位于同一个Broker,也可以位于不同的Broker 上。Shovel行为就像优秀的客户端应用程序能够负责连接源和目的地、负责消息的读写及负责连接失败问题的处理