Skip to content

rabbitMQ基本使用

一、依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.12.0</version>
</dependency>

二、常量类

package com.lcy.cloud.video.constant;

/**
 * @Description rabbitmq常量类
 * @Author lcy
 * @Date 2021/8/3 17:37
 */
public class RabbitmqConstant {

    /**
     * 主机--ip
     */
    public static final String HOST = "121.40.98.21";

    /**
     * 端口
     */
    public static final int PORT = 5672;

    /**
     * 用户名
     */
    public static final String USERNAME = "guest";

    /**
     * 密码
     */
    public static final String PASSWORD = "guest";

    /**
     * 虚拟主机
     */
    public static final String VIRTUAL_HOST = "/dev";

    /**
     * 队列名称
     */
    public static final String QUEUE_NAME = "test";

    /**
     * 广播交换机名称
     */
    public static final String PUB_EXCHANGE_NAME = "pub_exchange";

    /**
     * 点对点交换机名称
     */
    public static final String DIRECT_EXCHANGE_NAME = "direct_exchange";

    /**
     * topic交换机名称
     */
    public static final String TOPIC_EXCHANGE_NAME = "topic_exchange";

}

三、公共方法类

package com.lcy.cloud.common.util;

import com.lcy.cloud.common.constant.RabbitmqConstant;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @Description rabbitmq工具类
 * @Author lcy
 * @Date 2021/8/3 17:47
 */
public class RabbitmqUtil {

    /**
     * 获取rabbitmq连接工厂类
     * @return com.rabbitmq.client.ConnectionFactory
     * @author lcy
     * @date 2021/8/3 17:50
     **/
    public static ConnectionFactory getConnectionFactory(){
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(RabbitmqConstant.HOST);
        connectionFactory.setPort(RabbitmqConstant.PORT);
        connectionFactory.setUsername(RabbitmqConstant.USERNAME);
        connectionFactory.setPassword(RabbitmqConstant.PASSWORD);
        connectionFactory.setVirtualHost(RabbitmqConstant.VIRTUAL_HOST);
        return connectionFactory;
    }

}

四、普通消息--点对点交换机(默认)

五、生产者

package com.lcy.cloud.video.product;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

import com.lcy.cloud.common.constant.RabbitmqConstant;
import com.lcy.cloud.common.util.RabbitmqUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @Description rabbitmq普通队列消息生产者
 * @Author lcy
 * @Date 2021/8/3 17:21
 */
public class NormalRabbitMqProduct {

    public static void main(String[] args){
        ConnectionFactory connectionFactory = RabbitmqUtil.getConnectionFactory();
        try ( Connection connection = connectionFactory.newConnection();
                Channel channel = connection.createChannel()) {
            /*
             * 队列名称
             * 持久化配置:mq重启后还在
             * 是否独占:只能有一个消费者监听队列;当connection关闭是否删除队列,一般是false,发布订阅是独占
             * 自动删除: 当没有消费者的时候,自动删除掉,一般是false
             * 其他参数:队列不存在则会自动创建,如果存在则不会覆盖,所以此时的时候需要注意属性
             */
            channel.queueDeclare(RabbitmqConstant.QUEUE_NAME,false,false,false,null);
            String message = "message";
            channel.basicPublish("",RabbitmqConstant.QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }

}

六、消费者

两种方式进行消费。

如果要使消费者一直监听消息,把获取连接自动关闭的方法提取到外面。

如果是多个消费者消费消息,默认是轮训的分配消费,如1-10条消息,1、3、5、7、9分配在了消费者1上其它的在消费者2上,当消费者1处理很缓慢(服务器资源)的时候,这个时候就会造成消息堆积。

通过设置 channel.basicQos(1); ,每次只拿一条消息进行消费

package com.lcy.cloud.video.consumer;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

import com.lcy.cloud.common.constant.RabbitmqConstant;
import com.lcy.cloud.common.util.RabbitmqUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.Envelope;

/**
 * @Description TODO
 * @Author lcy
 * @Date 2021/8/3 17:21
 */
public class NormalRabbitMqConsumer {

    public static void main(String[] args){
        ConnectionFactory connectionFactory = RabbitmqUtil.getConnectionFactory();
        try (Connection connection = connectionFactory.newConnection();
                Channel channel = connection.createChannel()) {
            //每次只拿一条消息进行消费
            channel.basicQos(1);
            /*
             * 队列名称
             * 持久化配置:mq重启后还在
             * 是否独占:只能有一个消费者监听队列;当connection关闭是否删除队列,一般是false,发布订阅是独占
             * 自动删除: 当没有消费者的时候,自动删除掉,一般是false
             * 其他参数:队列不存在则会自动创建,如果存在则不会覆盖,所以此时的时候需要注意属性
             */
            channel.queueDeclare(RabbitmqConstant.QUEUE_NAME,false,false,false,null);
            //Consumer consumer = new DefaultConsumer(channel){
            //    @Override
            //    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            //        // consumerTag 是固定的 可以做此会话的名字, deliveryTag 每次接收消息+1
            //        System.out.println("consumerTag消息标识="+consumerTag);
            //        //可以获取交换机,路由健等
            //        System.out.println("envelope元数据="+envelope);
            //        System.out.println("properties配置信息="+properties);
            //        System.out.println("body="+new String(body,StandardCharsets.UTF_8));
            //    }
            //};
            ////自动确认消息,但是生产环境是友好的,最好在处理完消息以后手动确认
            //channel.basicConsume(RabbitmqConstant.QUEUE_NAME,false,consumer);
            DeliverCallback deliverCallback = (consumerTag,message) -> {
                String body = new String(message.getBody(),StandardCharsets.UTF_8);
                System.out.println(" [x] Received '" + body + "'");
            };
            channel.basicConsume(RabbitmqConstant.QUEUE_NAME,true,deliverCallback,consumerTag -> { });
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }
}

七、广播模式(发布订阅模式)

发布-订阅模型中,消息生产者不再是直接面对queue(队列名称),而是直面exchange,都需要经过exchange来进行消息的发送, 所有发往同一个fanout交换机的消息都会被所有监听这个交换机的消费者接收到。 发布订阅-消息模型引入fanout交换机。

  • 通过把消息发送给交换机,交互机转发给对应绑定的队列

  • 交换机绑定的队列是排它独占队列,自动删除

八、生产者

package com.lcy.cloud.video.product;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

import com.lcy.cloud.video.constant.RabbitmqConstant;
import com.lcy.cloud.video.util.RabbitmqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @Description rabbitmq广播消息生产者
 * @Author lcy
 * @Date 2021/8/3 17:21
 */
public class PubRabbitMqProduct {

    public static void main(String[] args){
        ConnectionFactory connectionFactory = RabbitmqUtil.getConnectionFactory();
        try (Connection connection = connectionFactory.newConnection();
                Channel channel = connection.createChannel()) {
            String message = "pub message";
            //绑定交换机,fanout扇形,即广播类型
            channel.exchangeDeclare(RabbitmqConstant.PUB_EXCHANGE_NAME,BuiltinExchangeType.FANOUT);
            //发送消息
            channel.basicPublish(RabbitmqConstant.PUB_EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8));
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }

}

九、消费者

package com.lcy.cloud.video.consumer;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

import com.lcy.cloud.video.constant.RabbitmqConstant;
import com.lcy.cloud.video.util.RabbitmqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
 * @Description 广播队列消费者
 * @Author lcy
 * @Date 2021/8/3 17:21
 */
public class PubRabbitMqConsumer {

    public static void main(String[] args) throws IOException, TimeoutException{
        ConnectionFactory connectionFactory = RabbitmqUtil.getConnectionFactory();
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        //绑定交换机,fanout扇形,即广播类型
        channel.exchangeDeclare(RabbitmqConstant.PUB_EXCHANGE_NAME,BuiltinExchangeType.FANOUT);
        //获取队列(排它队列)
        String queueName = channel.queueDeclare().getQueue();
        //绑定队列和交换机,fanout交换机不用指定routingkey
        channel.queueBind(queueName,RabbitmqConstant.PUB_EXCHANGE_NAME,"");

        DeliverCallback deliverCallback = (consumerTag,delivery) -> {
            String message = new String(delivery.getBody(),StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(queueName,true,deliverCallback,consumerTag -> { });

    }
}

十、路由模式

交换机类型是Direct,队列和交换机绑定,需要指定一个路由key( 也叫Bingding Key),消息生产者发送消息给交换机,需要指定routingKey,交换机根据消息的路由key, 转发给对应的队列

十一、生产者

package com.lcy.cloud.video.product;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

import com.lcy.cloud.video.constant.RabbitmqConstant;
import com.lcy.cloud.video.util.RabbitmqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @Description 路由模式消息生产者
 * @Author lcy
 * @Date 2021/8/3 17:21
 */
public class RouteRabbitMqProduct {

    public static void main(String[] args){
        ConnectionFactory connectionFactory = RabbitmqUtil.getConnectionFactory();
        try (Connection connection = connectionFactory.newConnection();
                Channel channel = connection.createChannel()) {
            //绑定交换机
            channel.exchangeDeclare(RabbitmqConstant.DIRECT_EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
            String error = "error message";
            String info = "info message";
            String debug = "debug message";

            //发送消息
            channel.basicPublish(RabbitmqConstant.DIRECT_EXCHANGE_NAME,"errorKey",null,error.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(RabbitmqConstant.DIRECT_EXCHANGE_NAME,"infoKey",null,info.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(RabbitmqConstant.DIRECT_EXCHANGE_NAME,"debugKey",null,debug.getBytes(StandardCharsets.UTF_8));
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }

}

十二、消费者

package com.lcy.cloud.video.consumer;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

import com.lcy.cloud.video.constant.RabbitmqConstant;
import com.lcy.cloud.video.util.RabbitmqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
 * @Description 路由模式全部消息消费者
 * @Author lcy
 * @Date 2021/8/3 17:21
 */
public class RouteCommonRabbitMqConsumer {

    public static void main(String[] args) throws IOException, TimeoutException{
        ConnectionFactory connectionFactory = RabbitmqUtil.getConnectionFactory();
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        //绑定交换机
        channel.exchangeDeclare(RabbitmqConstant.DIRECT_EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
        //获取队列(排它队列)
        String queueName = channel.queueDeclare().getQueue();
        //绑定队列和交换机
        channel.queueBind(queueName,RabbitmqConstant.DIRECT_EXCHANGE_NAME,"errorKey");
        channel.queueBind(queueName,RabbitmqConstant.DIRECT_EXCHANGE_NAME,"infoKey");
        channel.queueBind(queueName,RabbitmqConstant.DIRECT_EXCHANGE_NAME,"debugKey");

        DeliverCallback deliverCallback = (consumerTag,delivery) -> {
            String message = new String(delivery.getBody(),StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(queueName,true,deliverCallback,consumerTag -> { });

    }
}
package com.lcy.cloud.video.consumer;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

import com.lcy.cloud.video.constant.RabbitmqConstant;
import com.lcy.cloud.video.util.RabbitmqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
 * @Description 路由模式errorKey消息消费者
 * @Author lcy
 * @Date 2021/8/3 17:21
 */
public class RouteErrorRabbitMqConsumer {

    public static void main(String[] args) throws IOException, TimeoutException{
        ConnectionFactory connectionFactory = RabbitmqUtil.getConnectionFactory();
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        //绑定交换机
        channel.exchangeDeclare(RabbitmqConstant.DIRECT_EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
        //获取队列(排它队列)
        String queueName = channel.queueDeclare().getQueue();
        //绑定队列和交换机
        channel.queueBind(queueName,RabbitmqConstant.DIRECT_EXCHANGE_NAME,"errorKey");

        DeliverCallback deliverCallback = (consumerTag,delivery) -> {
            String message = new String(delivery.getBody(),StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(queueName,true,deliverCallback,consumerTag -> { });

    }
}

十三、主题模式topic

交换机是 topic, 可以实现发布订阅模式fanout和路由模式Direct 的功能,更加灵活,支持模式匹配,通配符等

交换机同过通配符进行转发到对应的队列,* 代表一个词,#代表1个或多个词,一般用#作为通配符居多,比如 #.order, 会匹配 info.order 、sys.error.order, 而 *.order , 只会匹配 info.order, 之间是使用. 点进行分割多个词的; 如果是 ., 则info.order、error.order都会匹配

注意:

  • 交换机和队列绑定时用的binding使用通配符的路由健

  • 生产者发送消息时需要使用具体的路由健

十四、生产者

package com.lcy.cloud.video.product;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

import com.lcy.cloud.video.constant.RabbitmqConstant;
import com.lcy.cloud.video.util.RabbitmqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @Description topic消息生产者
 * @Author lcy
 * @Date 2021/8/3 17:21
 */
public class TopicRabbitMqProduct {

    public static void main(String[] args){
        ConnectionFactory connectionFactory = RabbitmqUtil.getConnectionFactory();
        try (Connection connection = connectionFactory.newConnection();
                Channel channel = connection.createChannel()) {
            //绑定交换机
            channel.exchangeDeclare(RabbitmqConstant.TOPIC_EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
            String error = "error message";
            String info = "info message";
            String debug = "debug message";

            //发送消息
            channel.basicPublish(RabbitmqConstant.TOPIC_EXCHANGE_NAME,"error.key",null,error.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(RabbitmqConstant.TOPIC_EXCHANGE_NAME,"info.key",null,info.getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(RabbitmqConstant.TOPIC_EXCHANGE_NAME,"debug.key",null,debug.getBytes(StandardCharsets.UTF_8));
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        }
    }

}

十五、消费者

package com.lcy.cloud.video.consumer;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

import com.lcy.cloud.video.constant.RabbitmqConstant;
import com.lcy.cloud.video.util.RabbitmqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
 * @Description topic所有消息消费者
 * @Author lcy
 * @Date 2021/8/3 17:21
 */
public class TopicCommonRabbitMqConsumer {

    public static void main(String[] args) throws IOException, TimeoutException{
        ConnectionFactory connectionFactory = RabbitmqUtil.getConnectionFactory();
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        //绑定交换机
        channel.exchangeDeclare(RabbitmqConstant.TOPIC_EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
        //获取队列(排它队列)
        String queueName = channel.queueDeclare().getQueue();
        //绑定队列和交换机
        channel.queueBind(queueName,RabbitmqConstant.TOPIC_EXCHANGE_NAME,"*.key");

        DeliverCallback deliverCallback = (consumerTag,delivery) -> {
            String message = new String(delivery.getBody(),StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(queueName,true,deliverCallback,consumerTag -> { });

    }
}
package com.lcy.cloud.video.consumer;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

import com.lcy.cloud.video.constant.RabbitmqConstant;
import com.lcy.cloud.video.util.RabbitmqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
 * @Description topic错误消息消费者
 * @Author lcy
 * @Date 2021/8/3 17:21
 */
public class TopicErrorRabbitMqConsumer {

    public static void main(String[] args) throws IOException, TimeoutException{
        ConnectionFactory connectionFactory = RabbitmqUtil.getConnectionFactory();
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();

        //绑定交换机
        channel.exchangeDeclare(RabbitmqConstant.TOPIC_EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
        //获取队列(排它队列)
        String queueName = channel.queueDeclare().getQueue();
        //绑定队列和交换机
        channel.queueBind(queueName,RabbitmqConstant.TOPIC_EXCHANGE_NAME,"error.key");

        DeliverCallback deliverCallback = (consumerTag,delivery) -> {
            String message = new String(delivery.getBody(),StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(queueName,true,deliverCallback,consumerTag -> { });

    }
}