新建一个名为rabbitmq-01的demo项目
image.png
因为我不是新建的maven项目,所以下载下列两个jar,
amqp-client-5.7.1.jar,slf4j-api-1.7.26.jar
导入项目
image.png
image.png
然后选择你要导入的jar
连接rabbitmq
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Main {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
Connection connection = connectionFactory.newConnection();
System.out.println(connection);
}
}
运行之后,这里不没有关闭,主要为了看下效果,浏览器输入http://localhost:15672
da
你可以看到已经在rabbitmq中注入了管道,这说明已经连接成功
发送消息
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Main {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
Connection connection = connectionFactory.newConnection();
Channel channel=connection.createChannel();
//exchange的类型包括:direct, topic, headers and fanout,我们本例子主要关注的是fanout
//fanout类型是指向所有的队列发送消息
//以下是创建一个fanout类型的exchange,取名logs
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String message = "先新建交换器,再发送信息";
//这里用了默认的exchanges,一个空字符串 "",在basicPublish这个方法中,第一个参数即是exchange的名称
//2.准备向我们命名的exchange发送消息啦
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
image.png
BuiltinExchangeType.FANOUT,新建一个fanout的交换器类型, fanout也就是发送给所有消费者
下面实现基于交换器的发布订阅模式
还是刚才的demo项目 ,指定一个发送到所有队列的FANOUT模式队列, 首先新建一个Producer_FANOUT,看名字就知道是提供者类了
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer_FANOUT {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
Connection connection = connectionFactory.newConnection();
Channel channel=connection.createChannel();
//exchange的类型包括:direct, topic, headers and fanout,我们本例子主要关注的是fanout
//fanout类型是指向所有的队列发送消息
//以下是创建一个fanout类型的exchange,取名logs
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String message = "先新建交换器,再发送信息";
//这里用了默认的exchanges,一个空字符串 "",在basicPublish这个方法中,第一个参数即是exchange的名称
//2.准备向我们命名的exchange发送消息啦
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
再新建一个名为Consumer_FANOUT_1,Consumer_FANOUT_2的订阅者类,也可以说是接受者类
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_FANOUT_1 {
private static String QUEUE_NAME = "Consumer_FANOUT_1"; //队列名称
private static final String EXCHANGE_NAME = "logs"; //交换器名称
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务器主机
factory.setHost("127.0.0.1");
//设置用户名
factory.setUsername("guest");
//设置密码
factory.setPassword("guest");
try {
//创建连接
Connection connection = factory.newConnection();
//创建消息通道
final Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//绑定队列与交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
//消息服务器每次只向消费者发送一条消息
Consumer consumer = new DefaultConsumer(channel) {
//重写DefaultConsumer中handleDelivery方法,在方法中获取消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
//消息沉睡一秒
Thread.sleep(1000);
String message = new String(body, "UTF-8");
System.out.println("Consumer_FANOUT_1 收到消息 '" + message + "'");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("Consumer_FANOUT_1 消息消费完成....");
}
}
};
//监听消息
channel.basicConsume(QUEUE_NAME, true, consumer);
} catch (Exception e) {
e.printStackTrace();
} finally {
}
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_FANOUT_2 {
private static String QUEUE_NAME = "Consumer_FANOUT_2"; //队列名称
private static final String EXCHANGE_NAME = "logs"; //交换器名称
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务器主机
factory.setHost("127.0.0.1");
//设置用户名
factory.setUsername("guest");
//设置密码
factory.setPassword("guest");
try {
//创建连接
Connection connection = factory.newConnection();
//创建消息通道
final Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//绑定队列与交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
//消息服务器每次只向消费者发送一条消息
Consumer consumer = new DefaultConsumer(channel) {
//重写DefaultConsumer中handleDelivery方法,在方法中获取消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
//消息沉睡一秒
Thread.sleep(1000);
String message = new String(body, "UTF-8");
System.out.println("Consumer_FANOUT_1 收到消息 '" + message + "'");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("Consumer_FANOUT_1 消息消费完成....");
}
}
};
//监听消息
channel.basicConsume(QUEUE_NAME, true, consumer);
} catch (Exception e) {
e.printStackTrace();
} finally {
}
}
}
然后我们先运行两个接受这类,再运行提供者类, 结果显示, 两个接收者类都能接收到提供者类发布的消息
注意: 接收者类队列名字不能相同,如果想当将只有一个接受者类能接收到消息
DIRECT模式
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer_DIRECT {
private static final String EXCHANGE_NAME = "exchange-routing";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务器主机
factory.setHost("127.0.0.1");
//设置用户名
factory.setUsername("guest");
//设置密码
factory.setPassword("guest");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String message = "Hello World!";
//发送消息
for (int i = 0; i < 10; i++) {
if (i % 2 == 0) {
//发送消息,指定key
channel.basicPublish(EXCHANGE_NAME, "key2", null, (message + i).getBytes());
System.out.println(" 偶数消息 '" + message + i + "'");
} else {
//发送消息
channel.basicPublish(EXCHANGE_NAME, "key1", null, (message + i).getBytes());
System.out.println(" 奇数消息 '" + message + i + "'");
}
}
} catch (Exception e) {
} finally {
channel.close();
connection.close();
}
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer_DIRECT_1 {
/**
* 队列名字
*/
private static String QUEUE_NAME = "queue1";
private static final String EXCHANGE_NAME = "exchange-routing";
public static void main(String[] args) {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务器主机
factory.setHost("127.0.0.1");
//设置用户名
factory.setUsername("guest");
//设置密码
factory.setPassword("guest");
Connection connection = null;
try {
//创建连接
connection = factory.newConnection();
//创建消息通道
final Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明队列
channel.queueDeclare(QUEUE_NAME,false, false, false, null);
//绑定队列与交换机,指定接收的消息的key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key1");
//消息服务器每次只向消费者发送一条消息
Consumer consumer = new DefaultConsumer(channel){
//重写DefaultConsumer中handleDelivery方法,在方法中获取消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
//消息沉睡一秒
Thread.sleep(1000);
String message = new String(body, "UTF-8");
System.out.println("consumer1 收到消息 '" + message + "'");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("consumer1 消息消费完成....");
}
}
};
//监听消息
channel.basicConsume(QUEUE_NAME, true,consumer);
}catch (Exception e) {
e.printStackTrace();
}finally {
}
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer_DIRECT_2 {
/**
* 队列名字
*/
private static String QUEUE_NAME = "queue1";
private static final String EXCHANGE_NAME = "exchange-routing";
public static void main(String[] args){
//创建连接工厂QUEUE_NAME
ConnectionFactory factory = new ConnectionFactory();
//设置服务器主机
factory.setHost("127.0.0.1");
//设置用户名
factory.setUsername("guest");
//设置密码
factory.setPassword("guest");
Connection connection = null;
try {
//创建连接
connection = factory.newConnection();
//创建消息通道
final Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明队列
channel.queueDeclare(QUEUE_NAME,false, false, false, null);
//绑定队列与交换机,指定接收的消息的key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2");
Consumer consumer = new DefaultConsumer(channel){
//重写DefaultConsumer中handleDelivery方法,在方法中获取消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException{
try {
//消息沉睡100ms
Thread.sleep(100);
String message = new String(body, "UTF-8");
System.out.println("consumer2 收到消息 '" + message + "'");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("consumer2 消息消费完成....");
}
}
};
//监听消息,第二个参数为true时表示自动确认
channel.basicConsume(QUEUE_NAME, true,consumer);
}catch (Exception e) {
e.printStackTrace();
}finally {
}
}
}
输出的结果是
image.png
image.png
image.png
TOPIC模式
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer_TOPIC {
private static final String EXCHANGE_NAME = "exchange-topic";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务器主机
factory.setHost("127.0.0.1");
//设置用户名
factory.setUsername("guest");
//设置密码
factory.setPassword("guest");
Connection connection = null;
Channel channel = null;
try {
connection = factory.newConnection();
channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String message = "Hello World!";
//发送消息,绑定模式
channel.basicPublish(EXCHANGE_NAME, "key.one", null, ("one -" + message).getBytes());
channel.basicPublish(EXCHANGE_NAME, "key.two.msg", null, ("two -" + message).getBytes());
}catch (Exception e) {
}finally {
channel.close();
connection.close();
}
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer_TOPIC_1 {
/**
* 队列名字
*/
private static String QUEUE_NAME = "queue-topic_1";
private static final String EXCHANGE_NAME = "exchange-topic";
public static void main(String[] args) {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务器主机
factory.setHost("127.0.0.1");
//设置用户名
factory.setUsername("guest");
//设置密码
factory.setPassword("guest");
Connection connection = null;
try {
//创建连接
connection = factory.newConnection();
//创建消息通道
final Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//绑定队列与交换机,使用通配符key.* 表示只能匹配key下的一个路径,
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.*");
//消息服务器每次只向消费者发送一条消息
// channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
//重写DefaultConsumer中handleDelivery方法,在方法中获取消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
//消息沉睡一秒
Thread.sleep(1000);
String message = new String(body, "UTF-8");
System.out.println("consumer1 收到消息 '" + message + "'");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("consumer1 消息消费完成....");
}
}
};
//监听消息
channel.basicConsume(QUEUE_NAME, true, consumer);
} catch (Exception e) {
e.printStackTrace();
} finally {
}
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer_TOPIC_2 {
/**
* 队列名字
*/
private static String QUEUE_NAME = "queue-topic_1";
private static final String EXCHANGE_NAME = "exchange-topic";
public static void main(String[] args) {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务器主机
factory.setHost("127.0.0.1");
//设置用户名
factory.setUsername("guest");
//设置密码
factory.setPassword("guest");
Connection connection = null;
try {
//创建连接
connection = factory.newConnection();
//创建消息通道
final Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//绑定队列与交换机,使用通配符key.# 表示只能匹配key下的一个路径,
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.#");
//消息服务器每次只向消费者发送一条消息
// channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
//重写DefaultConsumer中handleDelivery方法,在方法中获取消息
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
//消息沉睡一秒
Thread.sleep(1000);
String message = new String(body, "UTF-8");
System.out.println("consumer2 收到消息 '" + message + "'");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("consumer2 消息消费完成....");
}
}
};
//监听消息
channel.basicConsume(QUEUE_NAME, true, consumer);
} catch (Exception e) {
e.printStackTrace();
} finally {
}
}
}
结果如下:
image.png
image.png









网友评论