侧边栏壁纸
博主头像
张种恩的技术小栈博主等级

行动起来,活在当下

  • 累计撰写 748 篇文章
  • 累计创建 65 个标签
  • 累计收到 39 条评论

目 录CONTENT

文章目录

消息中间件RabbitMQ(二)之Java客户端操作

zze
zze
2019-10-16 / 0 评论 / 0 点赞 / 445 阅读 / 27740 字

不定期更新相关视频,抖音点击左上角加号后扫一扫右方侧边栏二维码关注我~正在更新《Shell其实很简单》系列

在上篇【消息中间件RabbitMQ(一)之初见】中已经了解了 RabbitMQ 的安装、相关概念以及图形化管理界面的使用,接下来学习一下使用 Java 客户端来操作 RabbitMQ。

入门程序

依赖

1、使用 Maven 创建生产者工程和消费者 Java 工程,分别引入 RabbitMQ 客户端依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.0.3</version> <!-- 此版本与 Spring Boot 1.5.9 版本匹配 -->
</dependency>

生产者

2、编写生产者程序:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * RabbitMQ 入门程序-生产者
 */
public class Producer01 {
    /**
     * 队列名称
     */
    private static final String QUEUE_NAME = "helloworld";

    public static void main(String[] args) throws Exception {
        //  通过连接工厂创建和 mq 服务的连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.146.131"); // mq 服务主机地址
        connectionFactory.setPort(5672);        // mq 服务暴露的端口
        connectionFactory.setUsername("guest"); // 用户名
        connectionFactory.setPassword("guest"); // 密码
        connectionFactory.setVirtualHost("/");  // 设置虚拟机路径
        // 一个 mq 的服务可以设置多个虚拟机路径,每个虚拟机路径对应一个虚拟机(相当于一个独立的 mq 服务),即每个虚拟机路径下的 mq 服务环境是相互隔离的。
        Connection connection = null;
        Channel channel = null;
        try {
            // 创建新连接
            connection = connectionFactory.newConnection();
            // 创建会话通道(channel),客户端和 mq 服务都是通过连接中的会话通道通信
            channel = connection.createChannel();
            // 声明队列,如果队列不存在,则创建
            /**
             * 参数列表:
             *  String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *      queue:队列名称
             *      durable:是否持久化,mq 重启后队列依旧存在
             *      exclusive:是否排他,独占该连接,队列仅能被此连接访问,如果连接关闭,则队列自动删除
             *      autoDelete:自动删除,队列不再使用时自动删除此队列,如果和 exclusive 都为 true,则定义的队列就是临时队列
             *      arguments:参数,可设置队列的扩展参数,如设置队列的存活时间等...
             */
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            // 发布消息
            /**
             * 参数列表:
             *  String exchange, String routingKey, BasicProperties props, byte[] body
             *      exchange:交换机,如果不指定则使用默认的交换机
             *      routingKey:路由键,交换机按路由键来将消息转发到指定的队列,如果使用默认交换机,那么 routingKey 要设为队列的名称
             *      props:消息属性
             *      body:消息体,消息内容
             */
            String msg = "hello world";
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            System.out.printf("Send msg " + msg + " to " + QUEUE_NAME);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (channel != null) channel.close();
            if (connection != null) connection.close();
        }
    }
}

执行该生产者程序,执行完毕后可通过管理界面查看到队列及消息信息:
image.png
生产者发送消息流程小结:

  1. 创建连接;
  2. 创建通道;
  3. 声明队列;
  4. 发送消息;

消费者

3、编写消费者程序:

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * RabbitMQ 入门程序-消费者
 */
public class Consumer01 {

    private static final String QUEUE_NAME = "helloworld";

    public static void main(String[] args) throws Exception {
        //  通过连接工厂创建和 mq 服务的连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.146.131"); // mq 服务主机地址
        connectionFactory.setPort(5672);        // mq 服务暴露的端口
        connectionFactory.setUsername("guest"); // 用户名
        connectionFactory.setPassword("guest"); // 密码
        connectionFactory.setVirtualHost("/");  // 设置虚拟机路径
        // 一个 mq 的服务可以设置多个虚拟机路径,每个虚拟机路径对应一个虚拟机(相当于一个独立的 mq 服务),即每个虚拟机路径下的 mq 服务环境是相互隔离的。
        Connection connection = null;
        Channel channel = null;
        try {
            // 创建新连接
            connection = connectionFactory.newConnection();
            // 创建会话通道(channel),客户端和 mq 服务都是通过连接中的会话通道通信
            channel = connection.createChannel();
            // 声明队列,如果队列不存在,则创建
            /**
             * 参数列表:
             *  String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *      queue:队列名称
             *      durable:是否持久化,mq 重启后队列依旧存在
             *      exclusive:是否排他,独占该连接,队列仅能被此连接访问,如果连接关闭,则队列自动删除
             *      autoDelete:自动删除,队列不再使用时自动删除此队列,如果和 exclusive 都为 true,则定义的队列就是临时队列
             *      arguments:参数,可设置队列的扩展参数,如设置队列的存活时间等...
             */
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            // 监听队列接收(消费)消息
            /**
             * 参数列表:
             *  String queue, boolean autoAck, String consumerTag, Consumer callback
             *      queue:监听的队列名称
             *      autoAck:自动回复,当消费者接收到消息后要告诉 mq 消息已接收,如果将此参数设置为 true 那么将会自动回复 mq,设置为 false 时需要手动编程回复 mq
             *      consumerTag:消费者标签,消费者的标识,可选
             *      callback:接收到消息时的回调对象
             */
            channel.basicConsume(QUEUE_NAME, true,"Consumer01", new DefaultConsumer(channel){
                /**
                 * 接收到消息时该方法将被调用
                 * @param consumerTag 消费者标签,消费者标识,使用 basicConsume 方法时使用的标签名称
                 * @param envelope 信封,存放了消息的相关信息,如发送该消息的交换机、该消息在 mq 中的标识
                 * @param properties 消息属性,生产者使用 channel.basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) 发送消息时传递的 props 属性信息
                 * @param body 消息内容
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String exchange = envelope.getExchange(); // 获取发送消息的交换机
                    long deliveryTag = envelope.getDeliveryTag(); // 消息在 mq 中的标识 id,可用于消息接收确认
                    System.out.println("receive msg from "+ consumerTag + ":"+ new String(body, "utf-8"));
                }
            });

        } catch (Exception e) {
            e.printStackTrace();
        }
        // 消费者不关闭连接则持续监听接收(消费)队列中的消息
        //finally {
        //    if (channel != null) channel.close();
        //    if (connection != null) connection.close();
        //}
    }
}

执行该生产者程序,执行完毕后可通过管理界面查看到队列中的消息已被消费(接收),且之后生产者每发送一条消息到队列,消费者也随之消费一条消息;
消费者接收消息流程小结:

  1. 创建连接;
  2. 创建通道;
  3. 声明队列(如果能够确定队列存在,则可忽略);
  4. 监听队列接收消息;
  5. 回复 mq(如果设置了自动回复可忽略);

工作模式

RabbitMQ 有如下几种工作模式:

  1. Simple:简单模式;
  2. Work queues:工作队列模式;
  3. Publish/Subscribe:发布订阅模式;
  4. Routing:路由模式;
  5. Topics:通配符模式;
  6. Header:Header 转发器模式;
  7. RPC:远程过程调用模式;

各个工作模式使用的交换机类型可能是不一样的,如下:

  • Fanout Exchange 对应发布订阅模式;
  • Direct Exchange 对应路由模式;
  • Topic Exchange 对应通配符模式;
  • Headers Exchange 对应 Header 转发器模式;

上述交换机类型都定义在 com.rabbitmq.client.BuiltinExchangeType 枚举类中。

Simple 简单模式

image.png

生产者将消息放入队列, 消费者(Consumer) 监听消息队列,如果队列中有消息,就消费掉,消息被拿走后,消息自动从队列中删除。

隐患:消息可能没有被消费者正确处理,但该消息已经从队列中消失了,造成消息的丢失。
应用场景:聊天。

示例

上面入门程序使用的就是简单模式。

Work queues 工作队列模式

image.png

生产者将消息放入队列,消费者可以有多个。如消费者 C1 、消费者 C2 同时监听同一个队列。
C1 和 C2 共同消费当前的消息队列内容,Rabbit MQ 默认使用轮询的方式将消息平均发送给各个消费者。
上图中虽然没有出现交换机,但实际上是使用了默认的交换机。

隐患:高并发情况下,可能会产生某一个消息被多个消费者共同使用,此时可以设置同步,保证一条消息只能被一个消费者使用。
应用场景:抢红包,大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统自动争抢)。

示例

将入门程序中的消费者程序启动多个,则就是工作队列模式。

publish/subscribe 发布订阅模式

image.png

X 代表交换机,是 RabbitMQ 内部组件,生产者将消息放入交换机,交换机负责把消息发送到所有绑定了该交换机的队列,对应消息队列的消费者拿到消息进行消费。

应用场景:邮件群发、群聊天、广播、广告。

示例

编写生产者程序:

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * RabbitMQ 发布订阅模式-生产者
 */
public class Producer02 {
    private static final String FANOUT_EXCHANGE = "test_fanout_exchange";
    private static final String QUEUE_1 = "queue_1";
    private static final String QUEUE_2 = "queue_2";

    public static void main(String[] args) throws Exception {
        //  通过连接工厂创建和 mq 服务的连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.146.131"); // mq 服务主机地址
        connectionFactory.setPort(5672);        // mq 服务暴露的端口
        connectionFactory.setUsername("guest"); // 用户名
        connectionFactory.setPassword("guest"); // 密码
        connectionFactory.setVirtualHost("/");  // 设置虚拟机路径
        // 一个 mq 的服务可以设置多个虚拟机路径,每个虚拟机路径对应一个虚拟机(相当于一个独立的 mq 服务),即每个虚拟机路径下的 mq 服务环境是相互隔离的。
        Connection connection = null;
        Channel channel = null;
        try {
            // 创建新连接
            connection = connectionFactory.newConnection();
            // 创建会话通道(channel),客户端和 mq 服务都是通过连接中的会话通道通信
            channel = connection.createChannel();
            // 声明队列,如果队列不存在,则创建
            /**
             * 参数列表:
             *  String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *      queue:队列名称
             *      durable:是否持久化,mq 重启后队列依旧存在
             *      exclusive:是否排他,独占该连接,队列仅能被此连接访问,如果连接关闭,则队列自动删除
             *      autoDelete:自动删除,队列不再使用时自动删除此队列,如果和 exclusive 都为 true,则定义的队列就是临时队列
             *      arguments:参数,可设置队列的扩展参数,如设置队列的存活时间等...
             */
            channel.queueDeclare(QUEUE_1, true, false, false, null);
            channel.queueDeclare(QUEUE_2, true, false, false, null);
            // 声明交换机
            /**
             * 参数列表:
             *  String exchange, String type
             *      exchange:交换机名称
             *      type:交换机类型,具体信息可参考上篇文章中的 Exchange 类型
             */
            channel.exchangeDeclare(FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT);

            // 绑定交换机和队列
            /**
             * 参数列表:
             *  String queue, String exchange, String routingKey
             *      queue:队列名称
             *      exchange:交换机名称
             *      routingKey:路由 key,在发布订阅模式中无用,设为 "" 即可
             */
            channel.queueBind(QUEUE_1, FANOUT_EXCHANGE, "");
            channel.queueBind(QUEUE_2, FANOUT_EXCHANGE, "");

            // 发布消息给交换机
            /**
             * 参数列表:
             *  String exchange, String routingKey, BasicProperties props, byte[] body
             *      exchange:交换机,如果不指定则使用默认的交换机
             *      routingKey:路由键,交换机按路由键来将消息转发到指定的队列,如果使用默认交换机,那么 routingKey 要设为队列的名称
             *      props:消息属性
             *      body:消息体,消息内容
             */
            String msg = "hello world";
            channel.basicPublish(FANOUT_EXCHANGE, "", null, msg.getBytes());
            System.out.printf("Send msg " + msg + " to " + FANOUT_EXCHANGE);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (channel != null) channel.close();
            if (connection != null) connection.close();
        }
    }
}

发布订阅模式对应的交换机类型为 Fanout,所以该生产者程序在入门程序的基础上添加了手动定义的 Fanout 类型的交换机,并且声明了 2 个队列绑定到了该交换机。
执行该程序,查看会发现两个队列都接受到了消息:
image.png

Routing 路由模式

image.png

生产者将消息携带一个路由 key(字符串) 发送给交换机,交换机根据路由 key,匹配到与该路由 key 对应的消息队列,对应消息队列的消费者消费消息。

应用场景:消息分类通知(利用路由 key 可将消息分类)。

示例

编写生产者程序:

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * RabbitMQ 路由模式-生产者
 */
public class Producer03 {
    private static final String DIRECT_EXCHANGE = "test_direct_exchange";
    private static final String QUEUE_3 = "queue_3";
    private static final String QUEUE_4 = "queue_4";

    /**
     * queue_3 使用的路由 key
     */
    private static final String QUEUE_3_ROUTING_KEY = "queue_3_key";
    /**
     * queue_4 使用的路由 key
     */
    private static final String QUEUE_4_ROUTING_KEY = "queue_4_key";

    public static void main(String[] args) throws Exception {
        //  通过连接工厂创建和 mq 服务的连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.146.131"); // mq 服务主机地址
        connectionFactory.setPort(5672);        // mq 服务暴露的端口
        connectionFactory.setUsername("guest"); // 用户名
        connectionFactory.setPassword("guest"); // 密码
        connectionFactory.setVirtualHost("/");  // 设置虚拟机路径
        // 一个 mq 的服务可以设置多个虚拟机路径,每个虚拟机路径对应一个虚拟机(相当于一个独立的 mq 服务),即每个虚拟机路径下的 mq 服务环境是相互隔离的。
        Connection connection = null;
        Channel channel = null;
        try {
            // 创建新连接
            connection = connectionFactory.newConnection();
            // 创建会话通道(channel),客户端和 mq 服务都是通过连接中的会话通道通信
            channel = connection.createChannel();
            // 声明队列,如果队列不存在,则创建
            /**
             * 参数列表:
             *  String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *      queue:队列名称
             *      durable:是否持久化,mq 重启后队列依旧存在
             *      exclusive:是否排他,独占该连接,队列仅能被此连接访问,如果连接关闭,则队列自动删除
             *      autoDelete:自动删除,队列不再使用时自动删除此队列,如果和 exclusive 都为 true,则定义的队列就是临时队列
             *      arguments:参数,可设置队列的扩展参数,如设置队列的存活时间等...
             */
            channel.queueDeclare(QUEUE_3, true, false, false, null);
            channel.queueDeclare(QUEUE_4, true, false, false, null);
            // 声明交换机
            /**
             * 参数列表:
             *  String exchange, String type
             *      exchange:交换机名称
             *      type:交换机类型,具体信息可参考上篇文章中的 Exchange 类型
             */
            channel.exchangeDeclare(DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT);

            // 绑定交换机和队列
            /**
             * 参数列表:
             *  String queue, String exchange, String routingKey
             *      queue:队列名称
             *      exchange:交换机名称
             *      routingKey:路由 key
             */
            channel.queueBind(QUEUE_3, DIRECT_EXCHANGE, QUEUE_3_ROUTING_KEY);
            channel.queueBind(QUEUE_4, DIRECT_EXCHANGE, QUEUE_4_ROUTING_KEY);

            // 发布消息给交换机
            /**
             * 参数列表:
             *  String exchange, String routingKey, BasicProperties props, byte[] body
             *      exchange:交换机,如果不指定则使用默认的交换机
             *      routingKey:路由键,交换机按路由键来将消息转发到指定的队列,如果使用默认交换机,那么 routingKey 要设为队列的名称
             *      props:消息属性
             *      body:消息体,消息内容
             */
            String routingKey = "";
            for (int i = 0; i < 10; i++) {
                String msg = "hello world" + i;
                routingKey = i % 2 == 0 ? QUEUE_3_ROUTING_KEY : QUEUE_4_ROUTING_KEY;
                channel.basicPublish(DIRECT_EXCHANGE, routingKey, null, msg.getBytes());
                System.out.println("Send msg " + msg + " to " + DIRECT_EXCHANGE);
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (channel != null) channel.close();
            if (connection != null) connection.close();
        }
    }
}

路由模式对应的交换机类型为 Direct,所以该生产者程序在入门程序的基础上添加了手动定义的 Direct 类型的交换机,并且声明了 2 个队列绑定到了该交换机,在发布消息时也是直接发送给交换机,但发送消息的同时指定了路由 key 让消息分发到两个队列。
执行该程序,查看会发现两个队列都接受到了消息:
image.png

Topics 通配符模式

image.png

符号 *# 都是通配符,# 可匹配一个或多个词,* 匹配单个词,每个单词间使用 . 分隔。
实际上就是在路由模式的基础上添加了模糊匹配功能,生产者把消息携带一个带通配符的路由 key 交给交换机,交换机根据路由 key 的规则模糊匹配到对应的队列,由对应队列的消费者消费消息;

示例

编写生产者程序:

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * RabbitMQ 通配符模式-生产者
 */
public class Producer04 {
    private static final String TOPIC_EXCHANGE = "test_topic_exchange";
    private static final String QUEUE_5 = "queue_5";
    private static final String QUEUE_6 = "queue_6";

    /**
     * queue_5 使用的路由 key
     */
    private static final String QUEUE_5_ROUTING_KEY = "msg.#.queue_5.#";
    /**
     * queue_6 使用的路由 key
     */
    private static final String QUEUE_6_ROUTING_KEY = "msg.#.queue_6.#";

    public static void main(String[] args) throws Exception {
        //  通过连接工厂创建和 mq 服务的连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.146.131"); // mq 服务主机地址
        connectionFactory.setPort(5672);        // mq 服务暴露的端口
        connectionFactory.setUsername("guest"); // 用户名
        connectionFactory.setPassword("guest"); // 密码
        connectionFactory.setVirtualHost("/");  // 设置虚拟机路径
        // 一个 mq 的服务可以设置多个虚拟机路径,每个虚拟机路径对应一个虚拟机(相当于一个独立的 mq 服务),即每个虚拟机路径下的 mq 服务环境是相互隔离的。
        Connection connection = null;
        Channel channel = null;
        try {
            // 创建新连接
            connection = connectionFactory.newConnection();
            // 创建会话通道(channel),客户端和 mq 服务都是通过连接中的会话通道通信
            channel = connection.createChannel();
            // 声明队列,如果队列不存在,则创建
            /**
             * 参数列表:
             *  String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *      queue:队列名称
             *      durable:是否持久化,mq 重启后队列依旧存在
             *      exclusive:是否排他,独占该连接,队列仅能被此连接访问,如果连接关闭,则队列自动删除
             *      autoDelete:自动删除,队列不再使用时自动删除此队列,如果和 exclusive 都为 true,则定义的队列就是临时队列
             *      arguments:参数,可设置队列的扩展参数,如设置队列的存活时间等...
             */
            channel.queueDeclare(QUEUE_5, true, false, false, null);
            channel.queueDeclare(QUEUE_6, true, false, false, null);
            // 声明交换机
            /**
             * 参数列表:
             *  String exchange, String type
             *      exchange:交换机名称
             *      type:交换机类型,具体信息可参考上篇文章中的 Exchange 类型
             */
            channel.exchangeDeclare(TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC);

            // 绑定交换机和队列
            /**
             * 参数列表:
             *  String queue, String exchange, String routingKey
             *      queue:队列名称
             *      exchange:交换机名称
             *      routingKey:路由 key
             */
            channel.queueBind(QUEUE_5, TOPIC_EXCHANGE, QUEUE_5_ROUTING_KEY);
            channel.queueBind(QUEUE_6, TOPIC_EXCHANGE, QUEUE_6_ROUTING_KEY);

            // 发布消息给交换机
            /**
             * 参数列表:
             *  String exchange, String routingKey, BasicProperties props, byte[] body
             *      exchange:交换机,如果不指定则使用默认的交换机
             *      routingKey:路由键,交换机按路由键来将消息转发到指定的队列,如果使用默认交换机,那么 routingKey 要设为队列的名称
             *      props:消息属性
             *      body:消息体,消息内容
             */
            String routingKey = "";
            for (int i = 0; i < 10; i++) {
                String msg = "hello world" + i;
                routingKey = i % 2 == 0 ? "msg.queue_5" : "msg.queue_6";
                channel.basicPublish(TOPIC_EXCHANGE, routingKey, null, msg.getBytes());
                System.out.println("Send msg " + msg + " to " + TOPIC_EXCHANGE);
            }

            channel.basicPublish(TOPIC_EXCHANGE, "msg.queue_5.queue_6", null, "hello world to all queues".getBytes());

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (channel != null) channel.close();
            if (connection != null) connection.close();
        }
    }
}

通配符模式对应的交换机类型为 Topic,所以该生产者程序在入门程序的基础上添加了手动定义的 Topic 类型的交换机,并且声明了 2 个队列绑定到了该交换机,绑定时的路由 key 分别为 msg.#.queue_5.#msg.#.queue_6.#,之后分别发送了 5 条消息携带的路由 key 分别为 msg.queue_5msg.queue_6,且在最后还发送了一条消息路由 key 为 msg.queue_5.queue_6 ,路由 key msg.queue_5msg.queue_6 分别能匹配 msg.#.queue_5.#msg.#.queue_6.# 绑定的队列,而 msg.queue_5.queue_6 既能匹配到 msg.#.queue_5.# 绑定的队列,也能匹配到 msg.#.queue_6.# 绑定的队列,所以两个队列都应该接收到 6 条消息。
执行该程序,查看会发现两个队列都接受到了消息:
image.png

Header 转发器模式

Header 模式与 Routing 模式类似,不同的地方在于 Header 模式不使用路由 key,而是使用 Header 中的 key/value(键值对)来匹配队列。

示例

import com.rabbitmq.client.*;

import java.util.HashMap;

/**
 * RabbitMQ Header 模式-生产者
 */
public class Producer05 {
    private static final String HEADER_EXCHANGE = "test_header_exchange";
    private static final String QUEUE_7 = "queue_7";
    private static final String QUEUE_8 = "queue_8";

    /**
     * queue_5 使用的路由 key
     */
    private static final String QUEUE_7_ROUTING_KEY = "queue_7_key";
    /**
     * queue_6 使用的路由 key
     */
    private static final String QUEUE_8_ROUTING_KEY = "queue_8_key";

    public static void main(String[] args) throws Exception {
        //  通过连接工厂创建和 mq 服务的连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.146.131"); // mq 服务主机地址
        connectionFactory.setPort(5672);        // mq 服务暴露的端口
        connectionFactory.setUsername("guest"); // 用户名
        connectionFactory.setPassword("guest"); // 密码
        connectionFactory.setVirtualHost("/");  // 设置虚拟机路径
        // 一个 mq 的服务可以设置多个虚拟机路径,每个虚拟机路径对应一个虚拟机(相当于一个独立的 mq 服务),即每个虚拟机路径下的 mq 服务环境是相互隔离的。
        Connection connection = null;
        Channel channel = null;
        try {
            // 创建新连接
            connection = connectionFactory.newConnection();
            // 创建会话通道(channel),客户端和 mq 服务都是通过连接中的会话通道通信
            channel = connection.createChannel();
            // 声明队列,如果队列不存在,则创建
            /**
             * 参数列表:
             *  String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
             *      queue:队列名称
             *      durable:是否持久化,mq 重启后队列依旧存在
             *      exclusive:是否排他,独占该连接,队列仅能被此连接访问,如果连接关闭,则队列自动删除
             *      autoDelete:自动删除,队列不再使用时自动删除此队列,如果和 exclusive 都为 true,则定义的队列就是临时队列
             *      arguments:参数,可设置队列的扩展参数,如设置队列的存活时间等...
             */
            channel.queueDeclare(QUEUE_7, true, false, false, null);
            channel.queueDeclare(QUEUE_8, true, false, false, null);
            // 声明交换机
            /**
             * 参数列表:
             *  String exchange, String type
             *      exchange:交换机名称
             *      type:交换机类型,具体信息可参考上篇文章中的 Exchange 类型
             */
            channel.exchangeDeclare(HEADER_EXCHANGE, BuiltinExchangeType.HEADERS);

            // 绑定交换机和队列
            /**
             * 参数列表:
             *  String queue, String exchange, String routingKey, Map<String, Object> arguments
             *      queue:队列名称
             *      exchange:交换机名称
             *      routingKey:路由 key
             *      arguments:Headers 参数
             */
            HashMap<String, Object> queue7Map = new HashMap<>();
            queue7Map.put("routing_key",QUEUE_7_ROUTING_KEY);
            HashMap<String, Object> queue8Map = new HashMap<>();
            queue8Map.put("routing_key",QUEUE_8_ROUTING_KEY);

            channel.queueBind(QUEUE_7, HEADER_EXCHANGE, "",queue7Map);
            channel.queueBind(QUEUE_8, HEADER_EXCHANGE, "",queue8Map);

            // 发布消息给交换机
            /**
             * 参数列表:
             *  String exchange, String routingKey, BasicProperties props, byte[] body
             *      exchange:交换机,如果不指定则使用默认的交换机
             *      routingKey:路由键,交换机按路由键来将消息转发到指定的队列,如果使用默认交换机,那么 routingKey 要设为队列的名称
             *      props:消息属性
             *      body:消息体,消息内容
             */
            AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
            builder.headers(queue7Map);
            channel.basicPublish(HEADER_EXCHANGE, "", builder.build(), "hello queue_7".getBytes());
            builder.headers(queue8Map);
            channel.basicPublish(HEADER_EXCHANGE, "", builder.build(), "hello queue_8".getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (channel != null) channel.close();
            if (connection != null) connection.close();
        }
    }
}

通配符模式对应的交换机类型为 Headers,所以该生产者程序在入门程序的基础上添加了手动定义的 Headers 类型的交换机,并且声明了 2 个队列绑定到了该交换机。
绑定时没有设置路由,而是设置了 map 属性,之后发送消息时也指定一个 map,交换机会根据 map 的键值匹配并将消息转发到对应的消息队列。
执行该程序,查看会发现两个队列分别接受到了 1 条消息:
image.png

RPC 远程过程调用模式

image.png

RPC 即客户端远程调用服务端的方法 ,使用 MQ 可以实现 RPC 的异步调用,基于 Direct 交换机实现,流程如下:

  1. 客户端既是生产者就是消费者,向 RPC 请求队列发送 RPC 调用消息,同时监听 RPC 响应队列;
  2. 服务端监听 RPC 请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果;
  3. 服务端将 RPC 方法的结果发送到 RPC 响应队列;
  4. 客户端(RPC 调用方)监听 RPC 响应队列,接收到 RPC 调用结果;

SpringBoot整合RabbitMQ

参考 https://www.zze.xyz/archives/springboot31.html

0
MQ

评论区