1.创建RabbitMQ连接
public class ConnectionUtil {
/**
* 建立与RabbitMQ的连接
* @return
* @throws Exception
*/
public static Connection getConnection()throws Exception {
//定义连接工厂
ConnectionFactory factory =new ConnectionFactory();
//设置服务地址
factory.setHost("192.168.189.130");
//端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("/leyou");
factory.setUsername("leyou");
factory.setPassword("leyou");
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}
}
我们定义了一个ConnectionUtile的工具类,可以通过工具类中的getConnection()方法得到RabbitMQ的连接。
得到Connection后,可以通过Connection.createChannel()方法,创建多个channel创建多个实例。但是Channel实例不能在线程间共享,应用程序应为每一个线程创建一个channel。
2.交换器和队列的使用
2.1.创建交换器
channel.exchangeDeclare(EXCHANGE_NAME,"topic",true);
exchangeDeclare方法详解
Exchange.DeclareOk exchangeDeclare(String exchange,
String type,
boolean durable,
boolean autoDelete,
boolean internal,
Map arguments)throws IOException;
参数说明:
exchange:交换器的名称
type:交换器的类型。fanout、direct、topic、head
durable:是否持久化
autoDelete:是否自动删除。true表示自动删除。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此接触。
internal:设置是否是内置的。如果为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器上中,只能通过交换器路由到交换器这种方式
arguments:其他一些结构化参数。
该方法可重载。常用的重载方法有:
Exchange.DeclareOk exchangeDeclare(String exchange, String type)throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, String type,boolean durable)throws IOException;
2.2.创建队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
方法详解:
Queue.DeclareOk queueDeclare(String queue,boolean durable,boolean exclusive,boolean autoDelete,
Map arguments)throws IOException;
Queue.DeclareOk queueDeclare()throws IOException;
参数说明:
queue:队列的名字
durable:是否持久化
exclusive:设置是否排他。设置为true时,则为排他。如果一个队列被声明为排他队列,该队列仅对首次声明他的连接可见,并在连接断开时自动删除。
注意:1.排他队列是基于连接Connection可见的,同一个连接的不同信道是可以同时访问同一连接创建的排他队列。
2.如果一个连接已经声明了一个排他队列,其他连接是不允许建立其他排他队列的。
3.即使该队列是持久化的,一旦关闭连接或者客户端退出,该队列都会自动删除,这种队列适用于一个客户端同时发送和读取消息的场景。
autoDelete:设置是否自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
无参构造函数创建一个排他的、自动删除的、非持久化队列。
2.3.交换机与队列绑定
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,routingKey);
方法详解:
Queue.BindOk queueBind(String queue, String exchange, String routingKey)throws IOException;
Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map arguments)throws IOException;
void queueBindNoWait(String queue, String exchange, String routingKey, Map arguments)throws IOException;
参数详解:
queue:队列名称
exchange:交换器名称
routingKey:用来绑定队列和交换器的路由键
argument:定义当定的一些参数
网友评论