消息队列

消息指的是两个应用间传递的数据。数据的类型有很多种形式,可能只包含文本字符串,也可能包含嵌入对象。

消息队列(Message Queue)是在消息的传输过程中保存消息的容器。在消息队列中,通常有生产者和消费者两种角色。生产者只负责发送数据到消息队列,谁从消息队列中取出数据处理,他不管。消费者只负责从消息队列中取出数据处理,他不管这是谁发送的数据。

消息队列的作用

消息主要有三个作用:

  • 解耦:如图所示。假设有系统B、C、D都需要系统A的数据,于是系统A调用三个方法发送数据到B、C、D。这时,系统D不需要了,那就需要在系统A把相关的代码删掉。假设这时有个新的系统E需要数据,这时系统A又要增加调用系统E的代码。为了降低这种强耦合,就可以使用MQ,系统A只需要把数据发送到MQ,其他系统如果需要数据,则从MQ中获取即可

  • 异步:如图所示。一个客户端请求发送进来,系统A会调用系统B、C、D三个系统,同步请求的话,响应时间就是系统A、B、C、D的总和,也就是800ms。如果使用 MQ,系统A发送数据到 MQ,然后就可以返回响应给客户端,不需要再等待系统B、C、D 的响应,可以大大地提高性能。对于一些非必要的业务,比如发送短信,发送邮件等等,就可以采用MQ。

  • 削峰:如图所示。这其实是MQ一个很重要的应用。假设系统A在某一段时间请求数暴增,有5000个请求发送过来,系统A这时就会发送5000条SQL 进入 MySQL 进行执行,MySQL 对于如此庞大的请求当然处理不过来,MySQL 就会崩溃,导致系统瘫痪。如果使用MQ,系统A不再是直接发送SQL到数据库,而是把数据发送到MQ,MQ短时间积压数据是可以接受的,然后由消费者每次拉取2000条进行处理,防止在请求峰值时期大量的请求直接发送到MySQL导致系统崩溃

RabbitMQ 的特点

RabbitMQ是一款使用Erlang语言开发的,实现AMQP(高级消息队列协议)的开源消息中间件。首先要知道一些 RabbitMQ的 特点,官网可查:

  • 可靠性: 支持持久化,传输确认,发布确认等保证了MQ的可靠性。
  • 灵活的分发消息策略:这应该是 RabbitMQ 的一大特点。在消息进入MQ前由 Exchange(交换机) 进行路由消息。分发消息策略有:简单模式工作队列模式发布订阅模式路由模式通配符模式
  • 支持集群: 多台RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
  • 多种协议: RabbitMQ 支持多种消息队列协议,比如 STOMPMQTT 等等。
  • 支持多种语言客户端: RabbitMQ 几乎支持所有常用编程语言,包括 Java、.NET、Ruby 等等。
  • 可视化管理界面: RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker。
  • 插件机制: RabbitMQ 提供了许多插件,可以通过插件进行扩展,也可以编写自己的插件。

RabbitMQ 的组成

RabbitMQ 主要有以下几部分:

  • Broker:消息队列服务进程。此进程包括两个部分:Exchange 和 Queue。
  • Exchange:消息队列交换机。按一定的规则将消息路由转发到某个队列
  • Queue:消息队列,存储消息的队列。
  • Producer:消息生产者。生产方客户端将消息同交换机路由发送到队列中。
  • Consumer:消息消费者。消费队列中存储的消息。

这些组成部分协同工作的流程如下:

  1. 消息生产者连接到 RabbitMQ Broker,创建 connection,开启channel。
  2. 生产者声明交换机类型、名称、是否持久化等。
  3. 生产者发送消息,并指定消息是否持久化等属性和 routing key
  4. exchange 收到消息之后,根据 routing key 路由到跟当前交换机绑定的相匹配的队列里面。
  5. 消费者监听接收到消息之后开始业务处理。

Exchange 的四种类型

从上面的工作流程可以看出,实际上有个关键的组件Exchange,因为消息发送到RabbitMQ后首先要经过Exchange路由才能找到对应的Queue

Exchange类型有四种,根据不同的类型工作的方式也有所不同。在HelloWord例子中,我们就使用了比较简单的Direct Exchange,翻译就是直连交换机。其余三种分别是:Fanout exchange、Topic exchange、Headers exchange

Direct Exchange

直连交换机意思是此交换机需要绑定一个队列,要求该消息与一个特定的路由键完全匹配。简单点说就是一对一的,点对点的发送。

Fanout exchange

这种类型的交换机需要将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。简单点说就是发布订阅。

Topic Exchange

直接翻译的话叫做主题交换机,如果从用法上面翻译可能叫通配符交换机会更加贴切。这种交换机是使用通配符去匹配,路由到对应的队列。通配符有两种:“*” 、 “#”。需要注意的是通配符前面必须要加上"."符号。

* 符号:有且只匹配一个词。比如 a.*可以匹配到"a.b"、“a.c”,但是匹配不了"a.b.c"。

# 符号:匹配一个或多个词。比如"rabbit.#“既可以匹配到"rabbit.a.b”、“rabbit.a”,也可以匹配到"rabbit.a.b.c"。

比较常用的就是以上三种:直连(DirectExchange),发布订阅(FanoutExchange),通配符(TopicExchange)。熟练运用这三种交换机类型,基本上可以解决大部分的业务场景。

Headers Exchange

这种交换机用的相对没这么多。它跟上面三种有点区别,它的路由不是用routingKey进行路由匹配,而是在匹配请求头中所带的键值进行路由。如图所示:

创建队列需要设置绑定的头部信息,有两种模式:全部匹配和部分匹配。如上图所示,交换机会根据生产者发送过来的头部信息携带的键值去匹配队列绑定的键值,路由到对应的队列。

在 Java 中使用 RabbitMQ

第一步,在项目中添加 RabbitMQ 客户端依赖:

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>

第二步,模拟一个最简单的场景,一个生产者发送消息到队列中,一个消费者从队列中读取消息并打印。

新建生产者类 Producer:

public class Producer {
private final static String QUEUE_NAME = "test";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();

try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "测试消息";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者发送 '" + message + "'");
}
}
}

QUEUE_NAME 为队列名,也就是说,生产者发送的消息会放到 test 队列中。

通过以下方式创建服务器连接:

ConnectionFactory factory = new ConnectionFactory();
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel())

ConnectionFactory 是一个非常方便的工厂类,可用来创建到 RabbitMQ 的默认连接(主机名为“localhost”)。然后,创建一个通道( Channel)来发送消息。

Connection 和 Channel 类都实现了 Closeable 接口,所以可以使用 try-with-resource 语句。

在发送消息的时候,必须设置队列名称,通过 queueDeclare() 方法设置。

basicPublish() 方法用于发布消息:

  • 第一个参数为交换机(exchange),当前场景不需要,因此设置为空字符串;
  • 第二个参数为路由关键字(routingKey),暂时使用队列名填充;
  • 第三个参数为消息的其他参数(BasicProperties),暂时不配置;
  • 第四个参数为消息的主体,这里为 UTF-8 格式的字节数组,可以有效地杜绝中文乱码。

接下来新建消费者类 Consumer:

public class Consumer {
private final static String QUEUE_NAME = "test";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("等待接收消息");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" 消费者接收到的消息 '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}

创建通道的代码和生产者差不多,只不过没有使用 try-with-resource 语句来自动关闭连接和通道,因为我们希望消费者能够一直保持连接,直到我们强制关闭它。

在接收消息的时候,必须设置队列名称,通过 queueDeclare() 方法设置。

由于 RabbitMQ 将会通过异步的方式向我们推送消息,因此我们需要提供了一个回调,该回调将对消息进行缓冲,直到我们做好准备接收它们为止。

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" 消费者接收到的消息 '" + message + "'");
};

basicConsume() 方法用于接收消息:

第一个参数为队列名(queue),和生产者相匹配(test)。

第二个参数为 autoAck,如果为 true 的话,表明服务器要一次性交付消息。怎么理解这个概念呢?小伙伴们可以在运行消费者类 Consumer 类之前,先多次运行生产者类 Producer,向队列中发送多个消息,等到消费者类启动后,你就会看到多条消息一次性接收到了。

等待接收消息
消费者接收到的消息 '测试消息'
消费者接收到的消息 '测试消息'
消费者接收到的消息 '测试消息'

第三个参数为 DeliverCallback,也就是消息的回调函数。

第四个参数为 CancelCallback,消费者被取消(例如队列被删除)时的回调函数.

在消息发送的过程中,也可以使用 RabbitMQ 的管理面板查看到消息的走势图,如下所示。