rabbitMQ基本使用
一、依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.12.0</version>
</dependency>二、常量类
package com.lcy.cloud.video.constant;
/**
* @Description rabbitmq常量类
* @Author lcy
* @Date 2021/8/3 17:37
*/
public class RabbitmqConstant {
/**
* 主机--ip
*/
public static final String HOST = "121.40.98.21";
/**
* 端口
*/
public static final int PORT = 5672;
/**
* 用户名
*/
public static final String USERNAME = "guest";
/**
* 密码
*/
public static final String PASSWORD = "guest";
/**
* 虚拟主机
*/
public static final String VIRTUAL_HOST = "/dev";
/**
* 队列名称
*/
public static final String QUEUE_NAME = "test";
/**
* 广播交换机名称
*/
public static final String PUB_EXCHANGE_NAME = "pub_exchange";
/**
* 点对点交换机名称
*/
public static final String DIRECT_EXCHANGE_NAME = "direct_exchange";
/**
* topic交换机名称
*/
public static final String TOPIC_EXCHANGE_NAME = "topic_exchange";
}三、公共方法类
package com.lcy.cloud.common.util;
import com.lcy.cloud.common.constant.RabbitmqConstant;
import com.rabbitmq.client.ConnectionFactory;
/**
* @Description rabbitmq工具类
* @Author lcy
* @Date 2021/8/3 17:47
*/
public class RabbitmqUtil {
/**
* 获取rabbitmq连接工厂类
* @return com.rabbitmq.client.ConnectionFactory
* @author lcy
* @date 2021/8/3 17:50
**/
public static ConnectionFactory getConnectionFactory(){
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(RabbitmqConstant.HOST);
connectionFactory.setPort(RabbitmqConstant.PORT);
connectionFactory.setUsername(RabbitmqConstant.USERNAME);
connectionFactory.setPassword(RabbitmqConstant.PASSWORD);
connectionFactory.setVirtualHost(RabbitmqConstant.VIRTUAL_HOST);
return connectionFactory;
}
}四、普通消息--点对点交换机(默认)
五、生产者
package com.lcy.cloud.video.product;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import com.lcy.cloud.common.constant.RabbitmqConstant;
import com.lcy.cloud.common.util.RabbitmqUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @Description rabbitmq普通队列消息生产者
* @Author lcy
* @Date 2021/8/3 17:21
*/
public class NormalRabbitMqProduct {
public static void main(String[] args){
ConnectionFactory connectionFactory = RabbitmqUtil.getConnectionFactory();
try ( Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
/*
* 队列名称
* 持久化配置:mq重启后还在
* 是否独占:只能有一个消费者监听队列;当connection关闭是否删除队列,一般是false,发布订阅是独占
* 自动删除: 当没有消费者的时候,自动删除掉,一般是false
* 其他参数:队列不存在则会自动创建,如果存在则不会覆盖,所以此时的时候需要注意属性
*/
channel.queueDeclare(RabbitmqConstant.QUEUE_NAME,false,false,false,null);
String message = "message";
channel.basicPublish("",RabbitmqConstant.QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}六、消费者
两种方式进行消费。
如果要使消费者一直监听消息,把获取连接自动关闭的方法提取到外面。
如果是多个消费者消费消息,默认是轮训的分配消费,如1-10条消息,1、3、5、7、9分配在了消费者1上其它的在消费者2上,当消费者1处理很缓慢(服务器资源)的时候,这个时候就会造成消息堆积。
通过设置 channel.basicQos(1); ,每次只拿一条消息进行消费
package com.lcy.cloud.video.consumer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import com.lcy.cloud.common.constant.RabbitmqConstant;
import com.lcy.cloud.common.util.RabbitmqUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.Envelope;
/**
* @Description TODO
* @Author lcy
* @Date 2021/8/3 17:21
*/
public class NormalRabbitMqConsumer {
public static void main(String[] args){
ConnectionFactory connectionFactory = RabbitmqUtil.getConnectionFactory();
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
//每次只拿一条消息进行消费
channel.basicQos(1);
/*
* 队列名称
* 持久化配置:mq重启后还在
* 是否独占:只能有一个消费者监听队列;当connection关闭是否删除队列,一般是false,发布订阅是独占
* 自动删除: 当没有消费者的时候,自动删除掉,一般是false
* 其他参数:队列不存在则会自动创建,如果存在则不会覆盖,所以此时的时候需要注意属性
*/
channel.queueDeclare(RabbitmqConstant.QUEUE_NAME,false,false,false,null);
//Consumer consumer = new DefaultConsumer(channel){
// @Override
// public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// // consumerTag 是固定的 可以做此会话的名字, deliveryTag 每次接收消息+1
// System.out.println("consumerTag消息标识="+consumerTag);
// //可以获取交换机,路由健等
// System.out.println("envelope元数据="+envelope);
// System.out.println("properties配置信息="+properties);
// System.out.println("body="+new String(body,StandardCharsets.UTF_8));
// }
//};
////自动确认消息,但是生产环境是友好的,最好在处理完消息以后手动确认
//channel.basicConsume(RabbitmqConstant.QUEUE_NAME,false,consumer);
DeliverCallback deliverCallback = (consumerTag,message) -> {
String body = new String(message.getBody(),StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + body + "'");
};
channel.basicConsume(RabbitmqConstant.QUEUE_NAME,true,deliverCallback,consumerTag -> { });
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}七、广播模式(发布订阅模式)
发布-订阅模型中,消息生产者不再是直接面对queue(队列名称),而是直面exchange,都需要经过exchange来进行消息的发送, 所有发往同一个fanout交换机的消息都会被所有监听这个交换机的消费者接收到。 发布订阅-消息模型引入fanout交换机。
通过把消息发送给交换机,交互机转发给对应绑定的队列
交换机绑定的队列是排它独占队列,自动删除
八、生产者
package com.lcy.cloud.video.product;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import com.lcy.cloud.video.constant.RabbitmqConstant;
import com.lcy.cloud.video.util.RabbitmqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @Description rabbitmq广播消息生产者
* @Author lcy
* @Date 2021/8/3 17:21
*/
public class PubRabbitMqProduct {
public static void main(String[] args){
ConnectionFactory connectionFactory = RabbitmqUtil.getConnectionFactory();
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
String message = "pub message";
//绑定交换机,fanout扇形,即广播类型
channel.exchangeDeclare(RabbitmqConstant.PUB_EXCHANGE_NAME,BuiltinExchangeType.FANOUT);
//发送消息
channel.basicPublish(RabbitmqConstant.PUB_EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8));
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}九、消费者
package com.lcy.cloud.video.consumer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import com.lcy.cloud.video.constant.RabbitmqConstant;
import com.lcy.cloud.video.util.RabbitmqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
* @Description 广播队列消费者
* @Author lcy
* @Date 2021/8/3 17:21
*/
public class PubRabbitMqConsumer {
public static void main(String[] args) throws IOException, TimeoutException{
ConnectionFactory connectionFactory = RabbitmqUtil.getConnectionFactory();
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//绑定交换机,fanout扇形,即广播类型
channel.exchangeDeclare(RabbitmqConstant.PUB_EXCHANGE_NAME,BuiltinExchangeType.FANOUT);
//获取队列(排它队列)
String queueName = channel.queueDeclare().getQueue();
//绑定队列和交换机,fanout交换机不用指定routingkey
channel.queueBind(queueName,RabbitmqConstant.PUB_EXCHANGE_NAME,"");
DeliverCallback deliverCallback = (consumerTag,delivery) -> {
String message = new String(delivery.getBody(),StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName,true,deliverCallback,consumerTag -> { });
}
}十、路由模式
交换机类型是Direct,队列和交换机绑定,需要指定一个路由key( 也叫Bingding Key),消息生产者发送消息给交换机,需要指定routingKey,交换机根据消息的路由key, 转发给对应的队列
十一、生产者
package com.lcy.cloud.video.product;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import com.lcy.cloud.video.constant.RabbitmqConstant;
import com.lcy.cloud.video.util.RabbitmqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @Description 路由模式消息生产者
* @Author lcy
* @Date 2021/8/3 17:21
*/
public class RouteRabbitMqProduct {
public static void main(String[] args){
ConnectionFactory connectionFactory = RabbitmqUtil.getConnectionFactory();
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
//绑定交换机
channel.exchangeDeclare(RabbitmqConstant.DIRECT_EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
String error = "error message";
String info = "info message";
String debug = "debug message";
//发送消息
channel.basicPublish(RabbitmqConstant.DIRECT_EXCHANGE_NAME,"errorKey",null,error.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(RabbitmqConstant.DIRECT_EXCHANGE_NAME,"infoKey",null,info.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(RabbitmqConstant.DIRECT_EXCHANGE_NAME,"debugKey",null,debug.getBytes(StandardCharsets.UTF_8));
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}十二、消费者
package com.lcy.cloud.video.consumer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import com.lcy.cloud.video.constant.RabbitmqConstant;
import com.lcy.cloud.video.util.RabbitmqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
* @Description 路由模式全部消息消费者
* @Author lcy
* @Date 2021/8/3 17:21
*/
public class RouteCommonRabbitMqConsumer {
public static void main(String[] args) throws IOException, TimeoutException{
ConnectionFactory connectionFactory = RabbitmqUtil.getConnectionFactory();
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare(RabbitmqConstant.DIRECT_EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
//获取队列(排它队列)
String queueName = channel.queueDeclare().getQueue();
//绑定队列和交换机
channel.queueBind(queueName,RabbitmqConstant.DIRECT_EXCHANGE_NAME,"errorKey");
channel.queueBind(queueName,RabbitmqConstant.DIRECT_EXCHANGE_NAME,"infoKey");
channel.queueBind(queueName,RabbitmqConstant.DIRECT_EXCHANGE_NAME,"debugKey");
DeliverCallback deliverCallback = (consumerTag,delivery) -> {
String message = new String(delivery.getBody(),StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName,true,deliverCallback,consumerTag -> { });
}
}package com.lcy.cloud.video.consumer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import com.lcy.cloud.video.constant.RabbitmqConstant;
import com.lcy.cloud.video.util.RabbitmqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
* @Description 路由模式errorKey消息消费者
* @Author lcy
* @Date 2021/8/3 17:21
*/
public class RouteErrorRabbitMqConsumer {
public static void main(String[] args) throws IOException, TimeoutException{
ConnectionFactory connectionFactory = RabbitmqUtil.getConnectionFactory();
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare(RabbitmqConstant.DIRECT_EXCHANGE_NAME,BuiltinExchangeType.DIRECT);
//获取队列(排它队列)
String queueName = channel.queueDeclare().getQueue();
//绑定队列和交换机
channel.queueBind(queueName,RabbitmqConstant.DIRECT_EXCHANGE_NAME,"errorKey");
DeliverCallback deliverCallback = (consumerTag,delivery) -> {
String message = new String(delivery.getBody(),StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName,true,deliverCallback,consumerTag -> { });
}
}十三、主题模式topic
交换机是 topic, 可以实现发布订阅模式fanout和路由模式Direct 的功能,更加灵活,支持模式匹配,通配符等
交换机同过通配符进行转发到对应的队列,* 代表一个词,#代表1个或多个词,一般用#作为通配符居多,比如 #.order, 会匹配 info.order 、sys.error.order, 而 *.order , 只会匹配 info.order, 之间是使用. 点进行分割多个词的; 如果是 ., 则info.order、error.order都会匹配
注意:
交换机和队列绑定时用的binding使用通配符的路由健
生产者发送消息时需要使用具体的路由健
十四、生产者
package com.lcy.cloud.video.product;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import com.lcy.cloud.video.constant.RabbitmqConstant;
import com.lcy.cloud.video.util.RabbitmqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @Description topic消息生产者
* @Author lcy
* @Date 2021/8/3 17:21
*/
public class TopicRabbitMqProduct {
public static void main(String[] args){
ConnectionFactory connectionFactory = RabbitmqUtil.getConnectionFactory();
try (Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel()) {
//绑定交换机
channel.exchangeDeclare(RabbitmqConstant.TOPIC_EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
String error = "error message";
String info = "info message";
String debug = "debug message";
//发送消息
channel.basicPublish(RabbitmqConstant.TOPIC_EXCHANGE_NAME,"error.key",null,error.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(RabbitmqConstant.TOPIC_EXCHANGE_NAME,"info.key",null,info.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(RabbitmqConstant.TOPIC_EXCHANGE_NAME,"debug.key",null,debug.getBytes(StandardCharsets.UTF_8));
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}十五、消费者
package com.lcy.cloud.video.consumer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import com.lcy.cloud.video.constant.RabbitmqConstant;
import com.lcy.cloud.video.util.RabbitmqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
* @Description topic所有消息消费者
* @Author lcy
* @Date 2021/8/3 17:21
*/
public class TopicCommonRabbitMqConsumer {
public static void main(String[] args) throws IOException, TimeoutException{
ConnectionFactory connectionFactory = RabbitmqUtil.getConnectionFactory();
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare(RabbitmqConstant.TOPIC_EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
//获取队列(排它队列)
String queueName = channel.queueDeclare().getQueue();
//绑定队列和交换机
channel.queueBind(queueName,RabbitmqConstant.TOPIC_EXCHANGE_NAME,"*.key");
DeliverCallback deliverCallback = (consumerTag,delivery) -> {
String message = new String(delivery.getBody(),StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName,true,deliverCallback,consumerTag -> { });
}
}package com.lcy.cloud.video.consumer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
import com.lcy.cloud.video.constant.RabbitmqConstant;
import com.lcy.cloud.video.util.RabbitmqUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
* @Description topic错误消息消费者
* @Author lcy
* @Date 2021/8/3 17:21
*/
public class TopicErrorRabbitMqConsumer {
public static void main(String[] args) throws IOException, TimeoutException{
ConnectionFactory connectionFactory = RabbitmqUtil.getConnectionFactory();
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//绑定交换机
channel.exchangeDeclare(RabbitmqConstant.TOPIC_EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
//获取队列(排它队列)
String queueName = channel.queueDeclare().getQueue();
//绑定队列和交换机
channel.queueBind(queueName,RabbitmqConstant.TOPIC_EXCHANGE_NAME,"error.key");
DeliverCallback deliverCallback = (consumerTag,delivery) -> {
String message = new String(delivery.getBody(),StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(queueName,true,deliverCallback,consumerTag -> { });
}
}