RabbitMQ

RabbitMQ

简介

MQ(message queue)本质是个队列,FIFO 先入先出,只不过队列中存放的内容是 message而已,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务,使用了MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务

功能

  • 流量消峰
  • 应用解耦
  • 异步处理

分类

  • ActiveMQ
    • 优点:单机吞吐量万级,时效性ms级,可用性高,基于主从架构实现高可用性,消息可靠性较低的概率丢失数据
    • 缺点:官方社区现在对ActiveMQ 5.x维护越来越少,高吞吐量场景较少使用
  • Kafka
    • 主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。大型公司建议可以选用,如果有日志采集功能, 肯定是首选kafka
  • RocketMQ
    • 天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。RoketMQ在稳定性上可能更值得信赖,如果你的业务有上述并发场景,建议可以选择RocketMQ
  • RabbitMQ
    • 结合erlang语言本身的并发优势,性能好时效性微秒级,社区活跃度也比较高,管理界面用起来十分方便,如果你的数据量没有那么大,中小型公司优先选择功能比较完备的RabbitMQ

四大核心

  • 生产者:产生数据发送消息的程序
  • 交换机:是RabbitMQ非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定
  • 队列:是RabbitMQ内部使用的一种数据结构,尽管消息流经RabbitMQ和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式
  • 消费者:消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者
1
2
3
4
								MQ
|生产者| → |交换机 → 队列| → |消费者|
绑定关系
| ↘ 队列| → |消费者|

工作原理

1
2
3
4
5
6
7
8
9
10
				 |Connection| |Broker(RabbitMQ)| |Connection|
+----------+ +----------------+ +----------+
| | | ↗ Queue | | |
Producer→| Channel | |Exchange | | Channel |→Consumer
| | | ↘ Queue | | |
| Channel |→| |→| Channel |
| | | ↗ Queue | | |
Producer→| Channel | |Exchange | | Channel |→Consumer
| | | ↘ Queue | | |

  • Broker:接收和分发消息的应用,RabbitMQ Server就是Message Broker
  • Virtual host:出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念。当多个不同的用户使用同一个RabbitMQ server提供的服务时可以划分出多个vhost,每个用户在自己的vhost创建 exchange/queue等
  • Connection:publisher/consumer和broker之间的TCP连接
  • Channel:如果每一次访问RabbitMQ都建立一个Connection在消息量大的时候建立TCPConnection的开销将是巨大的,效率也较低。Channel是在connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的channel 进行通讯,AMQP method包含了channel id帮助客户端和message broker识别 channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统建立TCPconnection的开销
  • Exchange:message到达broker的第一站,根据分发规则匹配查询表中的routing key,分发消息到queue中去。常用的类型有direct (point-to-point), topic (publish-subscribe) and fanout(multicast)
  • Queue:消息最终被送到这里等待consumer取走
  • Binding:exchange和queue之间的虚拟连接,binding中可以包含routing key,Binding信息被保存到exchange中的查询表中,用于message的分发依据

配置

  • 启动服务:systemctl start rabbitmq-server
  • 查看服务状态:/sbin/service rabbitmq-server status
  • 停止服务(选择执行):/sbin/service rabbitmq-server stop
  • 开启web管理插件:rabbitmq-plugins enable rabbitmq_management
  • 用默认账号密码(guest)访问地址 http://mousse.cc:15672/出现权限问题
  • 创建账号:rabbitmqctl add_user admin 123
  • 设置用户角色:rabbitmqctl set_user_tags admin administrator
  • 设置用户权限:
    • set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
    • rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
    • 用户user_admin具有/vhost1这个virtual host中所有资源的配置、写、读权限
  • 当前用户和角色:rabbitmqctl list_users

HelloWorld

1
P → | | | | | | → C

步骤

  1. 依赖

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    <dependencies>
    <!-- RabbitMQ依赖客户端 -->
    <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
    <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.13.1</version>
    </dependency>
    <!-- 操作文件流的依赖-->
    <!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
    <dependency>
    <groupId>commons-io</groupId>
    <artifactId>commons-io</artifactId>
    <version>2.11.0</version>
    </dependency>
    </dependencies>
  2. 消息生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    public class Producer {

    // 队列名称
    public static final String QUEUE_NAME = "hello";

    // 发消息
    public static void main(String[] args) throws IOException, TimeoutException {
    // 创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    // 设置工厂IP连接RabbitMQ的队列
    factory.setHost("mousse.cc");
    // 设置用户名密码
    factory.setUsername("admin");
    factory.setPassword("root");
    // 创建连接
    Connection connection = factory.newConnection();
    // 获取信道
    Channel channel = connection.createChannel();
    /*
    产生队列:
    1.队列名称
    2.队列里的消息是否持久化(磁盘), 默认消息存储在内存中
    3.队列是否只供一个消费者进行消费, 是否进行消息共享
    4.最后一个消费者断开连接后队列是否自动删除
    5.其它参数
    */
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 发消息
    String message = "Hello World";
    /*
    发送一个消费:
    1.发送到哪个交换机
    2.路由的Key值, 这次是队列名
    3.其它参数信息
    4.发送消息的消息体, 需要调取二进制
    */
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
    System.out.println("消息发送完毕");
    }

    }
  3. 消息消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    public class Consumer {

    // 队列名称
    public static final String QUEUE_NAME = "hello";

    // 接收消息
    public static void main(String[] args) throws IOException, TimeoutException {
    //创建连接
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("mousse.cc");
    factory.setUsername("admin");
    factory.setPassword("root");
    Channel channel = factory.newConnection().createChannel();
    // 声明接收消息
    DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println(new String(message.getBody()));
    // 取消消息时的回调
    CancelCallback cancelCallback = consumerTag -> System.out.println("消息消费被中断");
    /*
    消费者消费消息
    1.消费哪个队列
    2.消费成功之后是否要自动应答
    3.消费者未成功消费的回调
    4.消费者取消消费的回调
    */
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }

    }

Work Queues

工作队列又称任务队列,主要思想是避免立即执行资源密集型任务而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进 程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务

1
2
3
4
5
									  ↗接收消息→ 工作线程
生产者 →大量消息→ 队列 →接收消息→ 工作线程 // 轮训分发消息, 三者关系为竞争关系
↘接收消息→ 工作线程

// 注意事项: 一个消息只能被处理一次,不可以处理多次

轮训分发消息

  1. 抽取工具类

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public class RabbitMqUtils {

    // 得到一个连接的channel
    public static Channel getChannel() throws IOException, TimeoutException {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("mousse.cc");
    factory.setUsername("admin");
    factory.setPassword("root");
    return factory.newConnection().createChannel();
    }

    }
  2. 启动两个工作线程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public class Worker1 {

    // 队列名称
    public static final String QUEUE_NAME = "hello";

    // 接收消息
    public static void main(String[] args) throws IOException, TimeoutException {
    Channel channel = RabbitMqUtils.getChannel();
    DeliverCallback deliverCallback = (consumerTag, message) ->
    System.out.println("接收到的消息: " + new String(message.getBody()));
    CancelCallback cancelCallback = consumerTag -> System.out.println(consumerTag + "消息者取消消费接口回调逻辑");
    System.out.println("C1等待接收消息");
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }

    }
  3. 启动一个发送线程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    public class Task {

    // 队列名称
    public static final String QUEUE_NAME = "hello";

    // 发送大量消息
    public static void main(String[] args) throws IOException, TimeoutException {
    Channel channel = RabbitMqUtils.getChannel();
    // 队列声明
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    // 从控制台中接收信息
    Scanner scanner = new Scanner(System.in);
    while (scanner.hasNext()) {
    String message = scanner.next();
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
    System.out.println("发送消息完成: " + message);
    }
    }

    }

消息应答

消费者在接收到消息并且处理该消息之后,告诉RabbitMQ它已经处理了,RabbitMQ才可以把该消息删除,保证了消息在发送过程中不丢失

自动应答

  • 消息发送后立即被认为已经传送成功
  • 这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前消费者那边出现连接或者channel关闭,那么消息就丢失了
  • 没有对传递的消息数量进行限制
  • 这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用

手动应答

  • 好处是可以批量应答并且减少网络拥堵

  • Channel.basicAck:用于肯定确认

    • RabbitMQ已知道该消息并且成功的处理消息,可以将其丢弃了
  • Channel.basicNack:用于否定确认

  • Channel.basicReject:用于否定确认

    • 与Channel.basicNack相比少一个参数
    • 不处理该消息了直接拒绝,可以将其丢弃
  • 代码

    1. 消息生产者

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      public class Task {

      // 队列名称
      public static final String QUEUE_NAME = "ack_queue";

      public static void main(String[] args) throws IOException, TimeoutException {
      Channel channel = RabbitMqUtils.getChannel();
      // 声明队列
      channel.queueDeclare(QUEUE_NAME, false, false, false, null);
      // 从控制台中输入信息
      Scanner scanner = new Scanner(System.in);
      while (scanner.hasNext()) {
      String message = scanner.next();
      channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
      System.out.println("生产者发出消息: " + message);
      }
      }

      }
    2. 消费者1

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      public class Worker1 {

      // 队列名称
      public static final String QUEUE_NAME = "ack_queue";

      // 消费信息
      public static void main(String[] args) throws IOException, TimeoutException {
      Channel channel = RabbitMqUtils.getChannel();
      System.out.println("C1等待消息处理, 时间较短");
      DeliverCallback deliverCallback = (consumerTag, message) -> {
      // 沉睡1s
      SleepUtils.sleep(1);
      System.out.println("接收到的消息: " + new String(message.getBody()));
      /*
      手动应答:
      1.消息的标记tag
      2.是否批量应答
      */
      channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
      };
      CancelCallback cancelCallback = consumerTag -> System.out.println(consumerTag + "消息者取消消费接口回调逻辑");
      // 设置为手动应答
      channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);
      }

      }
    3. 消费者2

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      public class Worker2 {

      // 队列名称
      public static final String QUEUE_NAME = "ack_queue";

      // 消费信息
      public static void main(String[] args) throws IOException, TimeoutException {
      Channel channel = RabbitMqUtils.getChannel();
      System.out.println("C2等待消息处理, 时间较长");
      DeliverCallback deliverCallback = (consumerTag, message) -> {
      // 沉睡1s
      SleepUtils.sleep(30);
      System.out.println("接收到的消息: " + new String(message.getBody()));
      /*
      手动应答:
      1.消息的标记tag
      2.是否批量应答
      */
      channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
      };
      CancelCallback cancelCallback = consumerTag -> System.out.println(consumerTag + "消息者取消消费接口回调逻辑");
      // 设置为手动应答
      channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);
      }

      }

Multiple

  • true:代表批量应答channel上未应答的消息
    • 比如说channel上有传送tag的消息5,6,7,8当前tag是8,那么此时5-8的这些还未应答的消息都会被确认收到消息应答
  • false:同上面相比只会应答tag=8的消息,5/6/7这三个消息依然不会被确认收到消息应答

消息自动重新入队

  • 如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或TCP连接丢失)导致消息 未发送ACK确认,RabbitMQ将了解到消息未完全处理并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样即使某个消费者偶尔死亡也可以确保不会丢失任何消息

持久化

默认情况下RabbitMQ退出或由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化,这时候即使重启rabbitmq队列也依然存在

队列持久化

1
2
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
  • 如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列,不然就会出现错误

消息实现持久化

1
2
3
// MessageProperties.PERSISTENT_TEXT_PLAIN
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));

  • 将消息标记为持久化并不能完全保证不会丢失消息

不公平分发

  • 轮训分发在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者1处理任务的速度非常快,而另外一个消费者2处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,RabbitMQ并不知道这种情况它依然很公平的进行分发
  • 为了避免这种情况可以设置参数:channel.basicQos(1);

预取值

  • 限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题
  • 通过使用basicQos方法设置“预取计数”值来完成的,该值定义通道上允许的未确认消息的最大数量
    • 0时为公平分发,1时为不公平分发,>1时为预取值
  • 虽然自动应答传输消息速率是最佳的,但在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的RAM消耗

发布确认

原理

生产者将信道设置成confirm模式:一旦信道进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从1开始)一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号,此外broker也可以设置 basic.ack的multiple域表示到这个序列号之前的所有消息都已经得到了处理

1
2
3
4
生产者 →发消息→ 队列
// 1.队列必须持久化
// 2.队列中的消息必须持久化
// 3.发布确认: RabbitMQ将发来的消息保存在磁盘上后再发送

confirm模式最大的好处在于他是异步的:一旦发布一条消息生产者应用程序就可以在等信 道返回确认的同时继续发送下一条消息,当消息最终得到确认之后生产者应用便可以通过回调方法来处理该确认消息

如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条 nack消息,生产者应用程序同样可以在回调方法中处理该nack消息

策略

发布确认默认是没有开启的,如果要开启需要调用方法confirmSelect,每当你要想使用发布 确认,都需要在channel上调用该方法

1
channel.confirmSelet();

单个确认发布

  • 同步等待确认,简单,但吞吐量非常有限
    • 一种同步确认发布的方式:发布一个消息之后只有它被确认发布,后续的消息才能继续发布
    • 个最大的缺点:发布速度特别的慢
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void publishMessageIndividually() throws Exception {
// 获取信道
Channel channel = RabbitMqUtils.getChannel();
// 队列声明
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
// 开启发布确认
channel.confirmSelect();
// 开始时间
long start = System.currentTimeMillis();
// 批量发消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
byte[] message = String.valueOf(i).getBytes(StandardCharsets.UTF_8);
channel.basicPublish("", queueName, null, message);
if (channel.waitForConfirms()) {
System.out.println("消息发送成功: " + i);
}
}
// 结束时间
long finish = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "条单独确认消息总用时: " + (finish - start) + "ms");
}

批量确认发布

  • 批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是那条
    消息出现了问题
    • 先发布一批消息然后一起确认可以极大地提高吞吐量
    • 缺点:当发生故障导致发布出现问题时不知道是哪个消息出现问题,必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息
    • 这种方案仍然是同步的,也一样阻塞消息的发布
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static void publishMessageBatch() throws Exception {
// 获取信道
Channel channel = RabbitMqUtils.getChannel();
// 队列声明
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
// 开启发布确认
channel.confirmSelect();
// 开始时间
long start = System.currentTimeMillis();
// 批量确认消息大小
int batchSize = 100;
// 批量发消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
byte[] message = String.valueOf(i).getBytes(StandardCharsets.UTF_8);
channel.basicPublish("", queueName, null, message);
if ((i + 1) % batchSize == 0) {
if (channel.waitForConfirms()) {
System.out.println("消息发送成功: " + i);
}
}
}
// 结束时间
long finish = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "条批量确认消息总用时: " + (finish - start) + "ms");
}

异步确认发布

  • 最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些
    • 性价比最高,无论是可靠性还是效率都高, 他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static void publishMessageAsync() throws Exception {
// 获取信道
Channel channel = RabbitMqUtils.getChannel();
// 队列声明
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName, false, false, false, null);
// 开启发布确认
channel.confirmSelect();
// 开始时间
long start = System.currentTimeMillis();
// 消息确认成功回调函数
ConfirmCallback ackCallback = (deliveryTag, multiple) -> System.out.println("已确认的消息: " + deliveryTag);
// 消息确认成功回调函数
ConfirmCallback nackCallback = (deliveryTag, multiple) -> System.out.println("未确认的消息: " + deliveryTag);
/*
消息监听器:
1.监听哪些消息成功
2.监听哪些消息失败
*/
channel.addConfirmListener(ackCallback, nackCallback);
// 批量发消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
byte[] message = String.valueOf(i).getBytes(StandardCharsets.UTF_8);
channel.basicPublish("", queueName, null, message);
}
// 结束时间
long finish = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "条异步确认消息总用时: " + (finish - start) + "ms");
}
  • 处理异步未确认消息

    • 最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用ConcurrentLinkedQueue这个队列在confirm callbacks与发布线程之间进行消息的传递
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    public static void publishMessageAsync() throws Exception {
    Channel channel = RabbitMqUtils.getChannel();
    String queueName = UUID.randomUUID().toString();
    channel.queueDeclare(queueName, false, false, false, null);
    channel.confirmSelect();
    /*
    线程安全有序的哈希表, 适用于高并发的情况
    能轻松将序号与消息进行关联
    能轻松批量删除条目
    支持高并发(多线程)
    */
    ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
    // 消息确认成功回调函数
    ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
    if (multiple) {
    // 删除已确认的消息, 剩下的都是未确认的消息
    ConcurrentNavigableMap<Long, String> confirmedMap = outstandingConfirms.headMap(deliveryTag);
    confirmedMap.clear();
    } else {
    outstandingConfirms.remove(deliveryTag);
    }
    System.out.println("已确认的消息: " + deliveryTag);
    };
    // 消息确认成功回调函数
    ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
    // 答应未确认的消息
    String message = outstandingConfirms.get(deliveryTag);
    System.out.println("未确认的消息: " + deliveryTag + "\tTag: " + message);
    };
    /*
    消息监听器:
    1.监听哪些消息成功
    2.监听哪些消息失败
    */
    channel.addConfirmListener(ackCallback, nackCallback);
    long start = System.currentTimeMillis();
    for (int i = 0; i < MESSAGE_COUNT; i++) {
    String message = String.valueOf(i);
    channel.basicPublish("", queueName, null, message.getBytes(StandardCharsets.UTF_8));
    // 记录所有要发送的消息
    outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
    }
    long finish = System.currentTimeMillis();
    System.out.println("发布" + MESSAGE_COUNT + "条异步确认消息总用时: " + (finish - start) + "ms");
    }

交换机

消息传达给多个消费者,这种模式称为“发布/订阅”

1
2
3
4
5
6
7
8
9
// 工作队列, 三者关系为竞争关系
↗ 消费者C1 // 消息只能被消费一次
生产者 → 队列 → 消费者C2 // 消息只能被消费一次
↘ 消费者C3 // 消息只能被消费一次

// 发布/订阅
↗绑定→ 队列 → 消费者C1 // 消息只能被消费一次
生产者 → 交换机 →绑定→ 队列 → 消费者C2 // 消息只能被消费一次
↘绑定→ 队列 → 消费者C3 // 消息只能被消费一次

Exchange

生产者生产的消息从不会直接发送到队列,生产者只能将消息发送到交换机(exchange)

交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列

类型

  • 直接:direct
  • 主题:topic
  • 标题:headers
  • 扇出:fanout

无名exchange

  • 空字符串表示默认或无名称交换机
  • 消息能路由发送到队列中其实是由routingKey(bindingkey)绑定key指定的,如果它存在的话

临时队列

不带有持久化的队列

可以创建一个具有随机名称的队列或能让服务器为我们选择一个随机队列名称那就更好了,一旦断开了消费者的连接队列将被自动删除

1
2
// 创建临时队列的方式
String queueName = channel.queueDeclare().getQueue();

绑定

binding是exchange和queue之间的桥梁,它告诉我们exchange和那个队列进行了绑定关系

Fanout

将接收到的所有消息广播到它知道的所有队列中。系统中默认有的exchange类型

routingKey相同

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 发消息给交换机
public class EmitLog {

private static final String EXCHANGE_NAME = "logs";

public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者发出消息: " + message);
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 消费者, 1/2代码相同
public class ReceiveLogs1 {

private static final String EXCHANGE_NAME = "logs";

public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
/*
声明一个交换机
1.交换机的名称
2.交换机的类型
*/
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
/*

声明一个临时队列, 队列名称是随机的
当消费者断开与队列的连接, 队列就自动删除
*/
String queueName = channel.queueDeclare().getQueue();
/*
绑定交换机与队列
1.队列名
2.交换机名
3.routingKey
*/
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("C1等待接收消息");
DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("接收到的消息: " + new String(message.getBody(), StandardCharsets.UTF_8));
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}

}

Direct

队列只对它绑定的交换机的消息感兴趣,绑定之后的 意义由其交换类型决定

工作方式:消息只去到它绑定的routingKey队列中去

routingKey不相同

多重绑定

  • exchange的绑定类型是direct,但是它绑定的多个队列的key都相同
  • 表现的就和fanout有类似
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class DirectLogs {

private static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String message = scanner.next();
if (message.contains("info")) {
channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes(StandardCharsets.UTF_8));
}
if (message.contains("warning")) {
channel.basicPublish(EXCHANGE_NAME, "warning", null, message.getBytes(StandardCharsets.UTF_8));
}
if (message.contains("error")) {
channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes(StandardCharsets.UTF_8));
}
System.out.println("生产者发出消息: " + message);
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ReceiveLogsDirect1 {

private static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] args) throws Exception {
String queueName = "console";
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "info");
channel.queueBind(queueName, EXCHANGE_NAME, "warning");
DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("接收到的消息" + new String(message.getBody(), StandardCharsets.UTF_8));
System.out.println("C1准备接收消息");
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ReceiveLogsDirect2 {

private static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] args) throws Exception {
String queueName = "disk";
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "error");
DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("接收到的消息" + new String(message.getBody(), StandardCharsets.UTF_8));
System.out.println("C2准备接收消息");
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}

}

Topic

routing_key不能随意写,它必须是一个单词列表,以点号分隔开,比如”stock.usd.nyse”,”nyse.vmw”,”quick.orange.rabbit”,单词列表最多不能超过 255字节

  • *:代替一个单词
  • #:替代零个或多个单词
1
2
3
4
5
6
7
8
9
10
11
12
13
									  ↗*.orange.*→ Q1 → C1
Producer → Exchange →*.*.rabbit↘
↘lazy.#→→→→→→ Q2 → C2
/*
quick.orange.rabbit 被队列Q1Q2接收到
lazy.orange.elephant 被队列Q1Q2接收到
quick.orange.fox 被队列Q1接收到
lazy.brown.fox 被队列Q2接收到
lazy.pink.rabbit 虽然满足两个绑定但只被队列Q2接收一次
quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃
quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃
lazy.orange.male.rabbit 是四个单词但匹配Q2
*/

当一个队列绑定键是#,那么这个队列将接收所有数据,类似fanout

若队列绑定键当中没有#和*出现,那么该队列绑定类型就是direct

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class EmitLogTopic {

private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("quick.orange.rabbit", "被队列Q1Q2接收到");
bindingKeyMap.put("lazy.orange.elephant", "被队列Q1Q2接收到");
bindingKeyMap.put("quick.orange.fox", "被队列Q1接收到");
bindingKeyMap.put("lazy.brown.fox", "被队列Q2接收到");
bindingKeyMap.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列Q2接收一次");
bindingKeyMap.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
bindingKeyMap.put("quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
bindingKeyMap.put("lazy.orange.male.rabbit", "是四个单词但匹配Q2");
for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
String routingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者发送消息: " + message + "\troutingKey: " + routingKey);
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class ReceiveLogsTopic1 {

private static final String EXCHANGE_NAME = "topic_logs";
private static final String QUEUE_NAME = "Q1";

public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.orange.*");
DeliverCallback deliverCallback = (consumerTag, message) ->
System.out.println(
"接收到的消息: " + new String(message.getBody(), StandardCharsets.UTF_8) +
"\t接收队列: " + QUEUE_NAME +
"\t绑定键: " + message.getEnvelope().getRoutingKey()
);
System.out.println("C1准备接收消息");
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ReceiveLogsTopic2 {

private static final String EXCHANGE_NAME = "topic_logs";
private static final String QUEUE_NAME = "Q2";

public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#");
DeliverCallback deliverCallback = (consumerTag, message) ->
System.out.println(
"接收到的消息: " + new String(message.getBody(), StandardCharsets.UTF_8) +
"\t接收队列: " + QUEUE_NAME +
"\t绑定键: " + message.getEnvelope().getRoutingKey()
);
System.out.println("C2准备接收消息");
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
}

}

死信队列

某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理就变成了死信,有死信自然就有了死信队列

为了保证订单业务的消息数据不丢失需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中

  • 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效

来源

1
2
3
4
5
6
Map<String, Object> arguments = new HashMap<>();
// 正常队列设置死信交换机
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 设置死信RoutingKey
arguments.put("x-dead-letter-routing-key", routing_key);
channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);

消息TTL过期

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Producer {

private static final String NORMAL_EXCHANGE = "normal_exchange";

public static void main(String[] args) throws Exception {
Channel channel = RabbitUtils.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
for (int i = 0; i < 10; i++) {
String message = "info" + (i + 1);
// 设置TTL时间, 单位: ms
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes(StandardCharsets.UTF_8));
System.out.println("发送消息: " + message);
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class Consumer1 {

// 普通交换机名
private static final String NORMAL_EXCHANGE = "normal_exchange";
// 死信交换机名
private static final String DEAD_EXCHANGE = "dead_exchange";
// 普通队列名
private static final String NORMAL_QUEUE = "normal_queue";
// 死信队列名
private static final String DEAD_QUEUE = "dead_queue";

public static void main(String[] args) throws Exception {
Channel channel = RabbitUtils.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 声明死信队列
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
// 绑定死信交换机和队列
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
// 常队列绑定死信队列信息
Map<String, Object> arguments = new HashMap<>();
/*
设置过期时间, 单位: ms
也可以从生产者发送消息时设置
arguments.put("x-message-ttl", 100000);
*/
// 正常队列设置死信交换机
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
// 设置死信RoutingKey
arguments.put("x-dead-letter-routing-key", "lisi");
channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("C1接收消息: " + new String(message.getBody(), StandardCharsets.UTF_8));
System.out.println("C1等待接收消息");
channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag -> {
});
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Consumer2 {

private static final String DEAD_EXCHANGE = "dead_exchange";
private static final String DEAD_QUEUE = "dead_queue";

public static void main(String[] args) throws Exception {
Channel channel = RabbitUtils.getChannel();
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("C2接收消息: " + new String(message.getBody(), StandardCharsets.UTF_8));
System.out.println("C2等待接收消息");
channel.basicConsume(DEAD_QUEUE, true, deliverCallback, consumerTag -> {
});
}
}

队列达到最大长度

  • 队列满了,无法再添加数据到MQ中
1
2
// 队列长度>6时将会变成死信
arguments.put("x-max-length", 6);

消息被拒绝

  • basic.reject或basic.nack并且requeue=fals
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
DeliverCallback deliverCallback = (consumerTag, message) -> {
String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
// 如果消息中包含偶数则拒收该消息
if (Integer.parseInt(messageBody.substring(4)) % 2 == 0) {
// false: 不再回到队列, 直接放入死信
channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
} else {
System.out.println("C1接收消息: " + messageBody);
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
};
System.out.println("C1等待接收消息");
// 手动确认
channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> {
});

延迟队列

延时队列就是用来存放需要在指定时间被处理的使用场景元素的队列

使用场景:订单在十分钟之内未支付则自动取消

TTL

方式1:针对每条消息设置 TTL

1
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();

方式2:在创建队列的时候设置

1
arguments.put("x-message-ttl", 100000);

SpringBoot

依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
<dependencies>
<!-- RabbitMQ 依赖 -->
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.5.4</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.5.4</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.78</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
<scope>provided</scope>
</dependency>

<!-- swagger -->
<!-- https://mvnrepository.com/artifact/io.springfox/springfox-swagger2 -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>3.0.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/io.springfox/springfox-swagger-ui -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>3.0.0</version>
</dependency>

<!-- RabbitMQ 测试依赖 -->
<!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit-test -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<version>2.3.10</version>
<scope>test</scope>
</dependency>
</dependencies>

配置文件

1
2
3
4
spring.rabbitmq.host=mousse.cc
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=root

Swagger配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Configuration
@EnableSwagger2
public class SwaggerConfig {

@Bean
public Docket webApiConfig() {
return new Docket(DocumentationType.SWAGGER_2).groupName("webApi")
.apiInfo(webApiInfo())
.select()
.build();
}

private ApiInfo webApiInfo() {
return new ApiInfoBuilder().title("rabbitmq 接口文档")
.description("本文档描述了 rabbitmq 微服务接口定义").version("1.0")
.contact(new Contact("Tom", "mousse.cc", "tom@mousse.cc"))
.build();
}
}

死信实现

缺陷

  • RabbitMQ只会检查第一个消息是否过期
  • 如果过期则丢到死信队列中一个消息的延时时长很长而第二个消息的延时时长很短,第二个消息并不会优先得到执行
  • 需要通过插件弥补

队列 TTL

创建两个队列QA和QB,两者队列TTL分别设置为10s和40s,然后在创建一个交换机X和死信交换机 Y,它们的类型都是direct

1
2
3
		  ↗XA→ QA(10s) →YD↘
P → X Y →YD→ QD → C
↘XB→ QB(40s) →YD↗

配置文件类代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Configuration
@EnableSwagger2
public class SwaggerConfig {

@Bean
public Docket webApiConfig() {
return new Docket(DocumentationType.SWAGGER_2).groupName("webApi")
.apiInfo(webApiInfo())
.select()
.build();
}

private ApiInfo webApiInfo() {
return new ApiInfoBuilder().title("rabbitmq 接口文档")
.description("本文档描述了 rabbitmq 微服务接口定义").version("1.0")
.contact(new Contact("Tom", "mousse.cc", "tom@mousse.cc"))
.build();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@Configuration
public class TtlQueueConfig {

// 普通交换机名
public static final String X_EXCHANGE = "X";
// 死信交换机名
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
// 普通队列名
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
// 死信队列名
public static final String DEAD_LETTER_QUEUE = "QD";

// 声明xExchange
@Bean("xExchange")
public DirectExchange xExchange() {
return new DirectExchange(X_EXCHANGE);
}

// 声明yExchange
@Bean("yExchange")
public DirectExchange yExchange() {
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}

// 声明普通队列A, 需要TTL10s
@Bean("queueA")
public Queue queueA() {
Map<String, Object> arguments = new HashMap<>(3);
// 正常队列设置死信交换机
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
// 设置死信RoutingKey
arguments.put("x-dead-letter-routing-key", "YD");
// 设置过期时间
arguments.put("x-message-ttl", 10000);
return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();
}

// 声明普通队列B, 需要TTL40s
@Bean("queueB")
public Queue queueB() {
Map<String, Object> arguments = new HashMap<>(3);
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
arguments.put("x-dead-letter-routing-key", "YD");
arguments.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();
}

// 声明死信队列
@Bean("queueD")
public Queue queueD() {
return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}

// 绑定
@Bean
public Binding queueABindingX() {
return BindingBuilder.bind(queueA()).to(xExchange()).with("XA");
}

@Bean
public Binding queueBBindingX() {
return BindingBuilder.bind(queueB()).to(xExchange()).with("XB");
}

@Bean
public Binding queueDBindingY() {
return BindingBuilder.bind(queueD()).to(yExchange()).with("YD");
}

}

消息生产者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 日志
@Slf4j
@RestController
@RequestMapping("/ttl")
// 发送延迟消息
public class SendMsgController {

@Autowired
private RabbitTemplate rabbitTemplate;

// 开始发消息
@GetMapping("/sendMsg/{message}")
public void sendMessage(@PathVariable String message) {
log.info("{}: 发送一条信息给两个TTL队列\"{}\"", new Date(), message);
rabbitTemplate.convertAndSend(X_EXCHANGE, "XA", "消息来自TTL=10s的队列: " + message);
rabbitTemplate.convertAndSend(X_EXCHANGE, "XB", "消息来自TTL=40s的队列: " + message);
}

}

消息消费者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
@Slf4j
@Component
// 队列TTL消费者
public class DeadLetterQueueConsumer {

// 接收消息
@RabbitListener(queues = DEAD_LETTER_QUEUE)
public void receiveD(Message message) throws Exception {
String msgBody = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("{}: 收到死信队列的消息\"{}\"", new Date(), msgBody);
}

}

延时队列优化

1
2
3
4
		  ↗XA→ QA(10s) →YD↘
P → X →XB→ QB(40s) →YD→ Y →YD→ QD → C
↘XC→ QC(XXs) →YD↗
// 新增了一个队列 QC,绑定关系如下,该队列不设置 TTL 时间

缺陷

  • RabbitMQ只会检查第一个消息是否过期
  • 如果过期则丢到死信队列中一个消息的延时时长很长而第二个消息的延时时长很短,第二个消息并不会优先得到执行
  • 需要通过插件弥补

配置文件类代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static final String QUEUE_C = "QC";

@Bean("queueC")
public Queue queueC() {
Map<String, Object> arguments = new HashMap<>(3);
arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
arguments.put("x-dead-letter-routing-key", "YD");
return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
}

@Bean
public Binding queueCBindingX() {
return BindingBuilder.bind(queueC()).to(yExchange()).with("XC");
}

消息生产者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 发带TTL的消息
@GetMapping("/sendExpMsg/{ttl}/{msg}")
public void sendMessage(@PathVariable String ttl, @PathVariable String msg) {
log.info("{}: 发送一条信息给时长为{}ms的TTL队列\"{}\"", new Date(), ttl, msg);
rabbitTemplate.convertAndSend(
X_EXCHANGE,
"XC",
"消息来自TTL=" + ttl + "ms的队列: " + msg,
msgVal -> {
msgVal.getMessageProperties().setExpiration(ttl);
return msgVal;
});
}
// RabbitMQ只会检查第一个消息是否过期
// 如果过期则丢到死信队列中一个消息的延时时长很长而第二个消息的延时时长很短,第二个消息并不会优先得到执行

插件实现

在官网上https://www.rabbitmq.com/community-plugins.html下载 rabbitmq_delayed_message_exchange插件,解压放置到RabbitMQ的插件目录

进入RabbitMQ的安装目录下的plgins目录,执行下面命令让该插件生效,然后重启 RabbitMQ
/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins rabbitmq-plugins enable rabbitmq_delayed_message_exchange

1
2
3
4
5
6
7
8
9
// 基于死信
生产者 → 交换机 → 队列[TTL]

死信交换机

死信队列 → 消费者

// 基于插件
生产者 → 交换机[延迟] → 队列 → 消费者

配置文件类代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Configuration
// 基于插件的延迟配置类
public class DelayedQueueConfig {

// 交换机
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
// 队列
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
// routingKey
public static final String DELAYED_ROUTING_KEY = "delayed.routing_key";

// 声明交换机
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
// 自定义交换机的类型
// 不要用枚举!!!
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message",
true, false, args);

}

// 声明队列
@Bean
public Queue delayedQueue() {
return QueueBuilder.durable(DELAYED_QUEUE_NAME).build();
}

// 绑定队列
@Bean
public Binding delayedQueueBindingDelayedExchange() {
// noargs: 构建方法
return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(DELAYED_ROUTING_KEY).noargs();
}

}

消息生产者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
// 发基于插件的延迟消息
@GetMapping("/sendDelayMsg/{msg}/{delay}")
public void sendMessage(@PathVariable(value = "msg") String msg, @PathVariable(value = "delay") Integer delay) {
log.info("{}: 发送一条信息给时长为{}ms的delay队列, 消息为\"{}\"", new Date(), delay, msg);
rabbitTemplate.convertAndSend(
DelayedQueueConfig.DELAYED_EXCHANGE_NAME,
DelayedQueueConfig.DELAYED_ROUTING_KEY,
"消息来自delay=" + delay + "ms的队列, 消息为\"" + msg + "\"",
msgVal -> {
msgVal.getMessageProperties().setDelay(delay);
return msgVal;
});
}

消息消费者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
@Slf4j
@Component
// 消费基于插件的延迟消息
public class DelayedQueueConsumer {

// 监听消息
@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
public void receiveDelayQueue(Message msg) {
String msgBody = new String(msg.getBody(), StandardCharsets.UTF_8);
log.info("{}: 收到延迟队列的消息\"{}\"", new Date(), msgBody);
}

}

总结

延时队列在需要延时处理的场景下非常有用,使用RabbitMQ来实现延时队列可以很好的利用 RabbitMQ的特性,如消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。通过RabbitMQ集群的特性可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失

延时队列还有很多其它选择,比如Java的DelayQueue,Redis的zset,Quartz或利用kafka的时间轮,这些方式各有特点

发布确认高级

在生产环境中由于一些不明原因导致RabbitMQ重启,在重启期间生产者消息投递失败导致消息丢失,需要手动处理和恢复

SpringBoot

配置文件

  • 在配置文件当中需要添加:spring.rabbitmq.publisher-confirm-type=correlated
    • NONE:禁用发布确认模式,默认值
    • CORRELATED:发布消息成功到交换器后会触发回调方法
    • SIMPLE:
      • 效果1:和CORRELATED值一样会触发回调方法
      • 效果2:在发布消息成功后使用rabbitTemplate调用waitForConfirms或 waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑
        • 要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker
1
2
3
4
5
spring.rabbitmq.host=mousse.cc
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=root
spring.rabbitmq.publisher-confirm-type=correlated

添加配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Configuration
// 发布确认高级
public class ConfirmConfig {

public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
public static final String CONFIRM_ROUTING_KEY = "key1";

@Bean
public Exchange confirmExchange() {
return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).build();
}

@Bean
public Queue confirmQueue() {
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}

@Bean
public Binding confirmQueueBindingConfirmExchange() {
return BindingBuilder.bind(confirmQueue()).to(confirmExchange()).with(CONFIRM_ROUTING_KEY).noargs();
}

}

消息生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Slf4j
@RestController
@RequestMapping("/confirm")
// 发布确认
public class ProduceController {

@Autowired
private RabbitTemplate rabbitTemplate;

@GetMapping("/sendMsg/{msg}")
public void sendMsg(@PathVariable String msg) {
log.info("{}: 发送一条信息: {}", new Date(), msg);
// 需要自己创建CorrelationData的对象
rabbitTemplate.convertAndSend(
ConfirmConfig.CONFIRM_EXCHANGE_NAME,
ConfirmConfig.CONFIRM_ROUTING_KEY,
msg,
new CorrelationData("1")
);
}

}

回调接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Slf4j
// 需要声明成组件
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback {

@Autowired
private RabbitTemplate rabbitTemplate;

// 注入到内部接口中, 一定要在注入rabbitTemplate后写
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
}

/**
* 交换机确认回调方法
*
* @param correlationData 保存回调消息的ID及相关信息
* @param ack 交换机接收到消息为true, 反之为false
* @param cause 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : "";
if (ack) {
log.info("{}: 交换机成功接收到ID为{}的消息", new Date(), id);
} else {
log.info("{}: 交换机还未接收到ID为{}的消息, 原因: {}", new Date(), id, cause);
}
}

}

消息消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Slf4j
@Component
public class ConfirmConsumer {

@RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE_NAME)
public void receiveConfirm(Message msg) {
log.info(
"{}: 收到队列" + msg.getMessageProperties().getConsumerQueue() + "的消息: {}",
new Date(),
new String(msg.getBody(), StandardCharsets.UTF_8)
);
}

}

回退消息

Mandatory

  • 在仅开启了生产者确认机制的情况下交换机接收到消息后会直接给消息生产者发送确认消息,如果发现该消息不可路由那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的
  • 通过设置mandatory参数可以在当消息传递过程中不可达目的地时将消息返回给生产者
1
2
3
4
5
6
spring.rabbitmq.host=mousse.cc
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=root
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true

回调接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Slf4j
// 需要声明成组件
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

@Autowired
private RabbitTemplate rabbitTemplate;

// 注入到内部接口中, 一定要在注入rabbitTemplate后写
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}

/**
* 交换机确认回调方法
*
* @param correlationData 保存回调消息的ID及相关信息
* @param ack 交换机接收到消息为true, 反之为false
* @param cause 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId() : "";
String exchange = correlationData != null && correlationData.getReturned() != null ? correlationData.getReturned().getExchange() : "";
if (ack) {
log.info("{}: 交换机{}成功接收到ID为{}的消息", new Date(), exchange, id);
} else {
log.info("{}: 交换机{}还未接收到ID为{}的消息, 原因: {}", new Date(), exchange, id, cause);
}
}

/**
* 只有在当消息传递过程中不可达目的地时将消息返回给生产者
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
log.info(
"{}: 消息{}被交换机{}退回, 原因: {}, 路由Key: {}",
new Date(),
new String(returned.getMessage().getBody(), StandardCharsets.UTF_8),
returned.getExchange(),
returned.getReplyText(),
returned.getRoutingKey()
);
}

}

备份交换机

1
2
3
4
5
6
7
8
							type=direct
producer → confirm.exchange → confirm.queue → confirm consumer

无法投送的消息将发送给备份交换机

↓ ↗ backup.queue →
backup.exchange
type=fanout ↘ warning.queue → warning consumer

修改配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@Configuration
// 发布确认高级
public class ConfirmConfig {

public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
public static final String BACKUP_QUEUE_NAME = "backup.queue";
public static final String WARNING_QUEUE_NAME = "warning.queue";
public static final String CONFIRM_ROUTING_KEY = "key1";


@Bean
public DirectExchange confirmExchange() {
return ExchangeBuilder
.directExchange(CONFIRM_EXCHANGE_NAME)
.withArgument("alternate-exchange", BACKUP_EXCHANGE_NAME) // 指明备份交换机
.build();
}

@Bean
public FanoutExchange backupExchange() {
return ExchangeBuilder.fanoutExchange(BACKUP_EXCHANGE_NAME).build();
}

@Bean
public Queue confirmQueue() {
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}

@Bean
public Queue backupQueue() {
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}

@Bean
public Queue warningQueue() {
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
}

@Bean
public Binding confirmQueueBindingConfirmExchange() {
return BindingBuilder.bind(confirmQueue()).to(confirmExchange()).with(CONFIRM_ROUTING_KEY);
}

@Bean
public Binding backupQueueBindingBackupExchange() {
return BindingBuilder.bind(backupQueue()).to(backupExchange());
}

@Bean
public Binding warningQueueBindingBackupExchange() {
return BindingBuilder.bind(warningQueue()).to(backupExchange());

}

}

报警消费者

1
2
3
4
5
6
7
8
9
10
11
@Slf4j
@Component
// 报警消费者
public class WarningConsumer {

@RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)
public void receiveWarningMsg(Message msg) {
log.info("{}: 发现不可路由消息: {}", new Date(), new String(msg.getBody(), StandardCharsets.UTF_8));
}

}

注意事项

重新启动项目的时候需要把原来的confirm.exchange删除因为我们修改了其绑定属性,不然报错

mandatory参数与备份交换机可以一起使用的时候,备份交换机优先级高

其他知识点

幂等性

用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。在单应用系统中只需要把数据操作放入事务中即可,发生错误立即回滚,但是在响应客户端的时候也有可能出现网络中断或者异常等等

消息重复消费

  • 消费者在消费MQ中的消息时MQ已把消息发送给消费者,消费者在给MQ返回ack时网络中断, 故MQ未收到确认信息,该条消息会重新发给其他的消费者或者在网络重连后再次发送给该消费者,实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息

消费端的幂等性保障

  • 在海量订单生成的业务高峰期生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性:即使收到了一样的消息,消息也永远不会被消费多次
  • 业界主流的幂等性有两种操作:
    • 唯一ID+指纹码机制,利用数据库主键去重
    • 利用redis的原子性去实现

唯一ID+指纹码机制

  • 指纹码:一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个id是否存
  • 在数据库中优势就是实现简单就一个拼接,然后查询判断是否重复,劣势就是在高并发时如果是单个数据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,不是最推荐

Redis原子性

  • 利用redis执行setnx命令,天然具有幂等性

优先级队列

0-255,越大优先级越高,(建议0-10)

使用场景:订单催付

  • redis只能用List做一个简简单单的消息队列,并不能实现一个优先级的场景,所以订单量大了后采用RabbitMQ进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级, 否则就是默认优先级

添加步骤

  1. 队列中代码添加优先级

    1
    2
    3
    Map<String, Object> arguments = new HashMap<>();
    // 官方允许0-255, 此处允许范围0-10
    arguments.put("x-max-priority", 10);
  2. 消息中代码添加优先级

    1
    AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(randomNumber).build();

惰性队列

惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时惰性队列就很有必要了

默认情况下生产者将消息发送到RabbitMQ的时,队列中的消息会尽可能的存储在内存之中, 这样可以更加快速的将消息发送给消费者。即使是持久化的消息在被写入磁盘的同时也会在内存中驻留一份备份。当RabbitMQ需要释放内存的时会将内存中的消息换页至磁盘中,这个操作会耗费较长时间也会阻塞队列的操作进而无法接收新的消息

在发送1百万条消息,每条消息大概占1KB的情况下普通队列占用内存1.2GB,惰性队列仅 占用1.5MB

两种模式

  • 在队列声明的时候可以通过x-queue-mode参数来设置队列的模式
    • default:默认,在3.6.0之前的版本无需做任何变更
    • lazy:惰性队列模式,可以通过调用channel.queueDeclare方法的时候在参数中设置,也可以通过Policy的方式设置
  • 如果一个队列同时使用这两种方式设置,那么Policy的方式具备更高的优先级
  • 如果要通过声明的方式改变已有队列的模式的话只能先删除队列再重新声明一个新的

集群

步骤

  1. 修改机器的主机名称:vim /etc/hostname

  2. 配置各个节点的hosts文件,让各个节点都能互相识别对方:vim /etc/hosts

    1
    2
    3
    10.211.55.74 node1
    10.211.55.75 node2
    10.211.55.76 node3
  3. 以确保各个节点的cookie文件使用的是同一个值

    • 在node1上执行远程操作命令:
      • scp /var/lib/rabbitmq/.erlang.cookie root@node2:/var/lib/rabbitmq/.erlang.cookie
      • scp /var/lib/rabbitmq/.erlang.cookie root@node3:/var/lib/rabbitmq/.erlang.cookie
  4. 启动RabbitMQ服务,顺带启动Erlang虚拟机和RbbitMQ应用服务(在三台节点上分别执行):rabbitmq-server -detached

  5. 在节点2执行:

    • rabbitmqctl stop_app
      • rabbitmqctl stop会将Erlang虚拟机关闭
      • rabbitmqctl stop_app只关闭RabbitMQ服务
    • rabbitmqctl reset
    • rabbitmqctl join_cluster rabbit@node1
    • rabbitmqctl start_app
      • 只启动应用服务
  6. 在节点3执行:

    • rabbitmqctl stop_app
    • rabbitmqctl reset
    • rabbitmqctl join_cluster rabbit@node2 rabbitmqctl start_app
  7. 集群状态:rabbitmqctl cluster_status

  8. 需要重新设置用户

    • 创建账号:rabbitmqctl add_user admin 123
    • 设置用户角色:rabbitmqctl set_user_tags admin administrator
    • 设置用户权限:rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
  9. 解除集群节点(node2和node3机器分别执行):

    • rabbitmqctl stop_app
    • rabbitmqctl reset
    • rabbitmqctl start_app
    • rabbitmqctl cluster_status
    • rabbitmqctl forget_cluster_node rabbit@node2(node1机器上执行)

镜像队列

引入镜像队列(Mirror Queue)的机制可以将队列镜像到集群中的其他Broker节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性

搭建

  • 在节点添加policy
    • Name:策略名,没有意义只有语意
    • Pattern:正则表达式,匹配此规则的交换机/队列名才会使用策略

Haproxy+Keepalive

实现高可用负载均衡

1
2
3
4
5
6
7
							  VIP
↙ ↘
HAProxy node11 node21 负载均衡
Keepalived ↓ ↓
+-+---+---+-+
↓ ↓ ↓
RabbitMQ node1 node2 node3

HAProxy

  • 提供高可用性、负载均衡及基于TCPHTTP应用的代理,支持虚拟主机,它是免费、快速并且可靠的一种解决方案
  • 搭建步骤:
    1. 下载haproxy(在node1和node2):yum -y install haproxy
    2. 修改node1和node2的haproxy.cfg
      • vim /etc/haproxy/haproxy.cfg
      • 需要修改红色 IP 为当前机器 IP
    3. 在两台节点启动haproxy
      • haproxy -f /etc/haproxy/haproxy.cfg
      • ps -ef | grep haproxy
    4. 访问地址:http://10.211.55.71:8888/stats

Keepalived

  • 实现双机(主备)热备
  • 如果前面配置的HAProxy主机突然宕机或者网卡失效,那么虽然RbbitMQ集群没有任何故障但是对于外界的客户端来说所有的连接都会被断开结果将是灾难性的
  • 为了确保负载均衡服务的可靠性同样显得十分重要,这里就要引入Keepalived它能够通过自身健康检查、资源接管功能做高可用(双机热备),实现故障转移
  • 搭建步骤:
    1. 下载keepalived:yum -y install keepalived
    2. 节点node1配置文件:vim /etc/keepalived/keepalived.conf
      • 把资料里面的keepalived.conf修改之后替换
    3. 节点node2配置文件
      1. 需要修改global_defs的router_id,如nodeB
      2. 其次要修改vrrp_instance_VI中state为”BACKUP”
      3. 最后要将priority设置为小于100的值
    4. 添加haproxy_chk.sh
      • 为了防止HAProxy服务挂掉之后Keepalived还在正常工作而没有切换到Backup 上,所以这里需要编写一个脚本来检测HAProxy务的状态,当HAProxy服务挂掉之后该脚本会自动重启HAProxy的服务,如果不成功则关闭Keepalived服务,这样便可以切换到Backup继续工作
      • vim /etc/keepalived/haproxy_chk.sh(可以直接上传文件)
      • 修改权限:chmod 777 /etc/keepalived/haproxy_chk.sh
    5. 启动keepalive命令(node1和node2启动):systemctl start keepalived
    6. 观察Keepalived的日志:tail -f /var/log/messages -n 200
    7. 观察最新添加的:vip ip add show
    8. node1 模拟keepalived关闭状态:systemctl stop keepalived
    9. 使用vip地址来访问rabbitmq集群

Federation Exchange

(broker北京)和(broker深圳)彼此之间相距甚远,网络延迟是一个不得不面对的问题。有一个在北京的业务(Client北京)需要连接(broker北京),向其中的交换器exchangeA发送消息,此时的网络延迟很小(Client北京)可以迅速将消息发送至exchangeA中,就算在开启了publisherconfirm机制或者事务机制的情况下,也可以迅速收到确认信息。此时又有个在深圳的业务(Client深圳)需要向exchangeA发送消息,那么(Client深圳)(broker北京)之间有很大的网络延迟(Client深圳)将发送消息至exchangeA会经历一定的延迟,尤其是在开启了publisherconfirm机制或者事务机制的情况下(Client深圳)会等待很长的延迟时间来接收(broker北京)的确认信息进而必然造成这条发送线程的性能降低,甚至造成一定程度上的 阻塞

将业务(Client深圳)部署到北京的机房可以解决这个问题,但是如果(Client深圳)调用的另些服务都部署在深圳,那么又会引发新的时延问题,这里使用Federation插件就可以很好地解决这个问题

Federation Queue

联邦队列可以在多个Broker节点或者集群之间为单个队列提供均衡负载的功能。一个联邦队列可以 连接一个或者多个上游队列(upstream queue)并从这些上游队列中获取消息以满足本地消费者消费消息的需求

Shovel

Federation具备的数据转发功能类似,Shovel够可靠、持续地从一个Broker中的队列(作为源端,即source)拉取数据并转发至另一个Broker中的交换器(作为目的端,即destination)作为源端的队列和作为目的端的交换器可以同时位于同一个Broker,也可以位于不同的Broker 上。Shovel行为就像优秀的客户端应用程序能够负责连接源和目的地、负责消息的读写及负责连接失败问题的处理


RabbitMQ
http://docs.mousse.cc/RabbitMQ/
作者
Mocha Mousse
发布于
2024年11月26日
许可协议