美文网首页工作生活
第三章,发布订阅demo

第三章,发布订阅demo

作者: 毛仑上保罗先生 | 来源:发表于2019-06-30 22:14 被阅读0次

新建一个名为rabbitmq-01的demo项目


image.png

因为我不是新建的maven项目,所以下载下列两个jar,
amqp-client-5.7.1.jar,slf4j-api-1.7.26.jar
导入项目


image.png
image.png

然后选择你要导入的jar

连接rabbitmq

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Main {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        Connection connection = connectionFactory.newConnection();
        System.out.println(connection);
    }
}

运行之后,这里不没有关闭,主要为了看下效果,浏览器输入http://localhost:15672

da

你可以看到已经在rabbitmq中注入了管道,这说明已经连接成功

发送消息

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Main {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        Connection connection = connectionFactory.newConnection();
        Channel channel=connection.createChannel();

        //exchange的类型包括:direct, topic, headers and fanout,我们本例子主要关注的是fanout
        //fanout类型是指向所有的队列发送消息
        //以下是创建一个fanout类型的exchange,取名logs
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        String message = "先新建交换器,再发送信息";
 
        //这里用了默认的exchanges,一个空字符串 "",在basicPublish这个方法中,第一个参数即是exchange的名称
        //2.准备向我们命名的exchange发送消息啦
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();

     }
} 

image.png

BuiltinExchangeType.FANOUT,新建一个fanout的交换器类型, fanout也就是发送给所有消费者

下面实现基于交换器的发布订阅模式
还是刚才的demo项目 ,指定一个发送到所有队列的FANOUT模式队列, 首先新建一个Producer_FANOUT,看名字就知道是提供者类了

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer_FANOUT {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        Connection connection = connectionFactory.newConnection();
        Channel channel=connection.createChannel();

        //exchange的类型包括:direct, topic, headers and fanout,我们本例子主要关注的是fanout
        //fanout类型是指向所有的队列发送消息
        //以下是创建一个fanout类型的exchange,取名logs
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        String message = "先新建交换器,再发送信息";

        //这里用了默认的exchanges,一个空字符串 "",在basicPublish这个方法中,第一个参数即是exchange的名称
        //2.准备向我们命名的exchange发送消息啦
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();

     }
}

再新建一个名为Consumer_FANOUT_1,Consumer_FANOUT_2的订阅者类,也可以说是接受者类

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer_FANOUT_1 {
    private static String QUEUE_NAME = "Consumer_FANOUT_1";  //队列名称
    private static final String EXCHANGE_NAME = "logs"; //交换器名称

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务器主机
        factory.setHost("127.0.0.1");
        //设置用户名
        factory.setUsername("guest");
        //设置密码
        factory.setPassword("guest");
        try {
            //创建连接
            Connection connection = factory.newConnection();
            //创建消息通道
            final Channel channel = connection.createChannel();
            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            //声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //绑定队列与交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
            //消息服务器每次只向消费者发送一条消息
            Consumer consumer = new DefaultConsumer(channel) {
                //重写DefaultConsumer中handleDelivery方法,在方法中获取消息
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,  AMQP.BasicProperties properties, byte[] body) throws IOException {
                    try {
                        //消息沉睡一秒
                        Thread.sleep(1000);
                        String message = new String(body, "UTF-8");
                        System.out.println("Consumer_FANOUT_1 收到消息 '" + message + "'");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println("Consumer_FANOUT_1 消息消费完成....");
                    }

                }
            };
            //监听消息
            channel.basicConsume(QUEUE_NAME, true, consumer);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
        }
    }
}

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer_FANOUT_2 {
    private static String QUEUE_NAME = "Consumer_FANOUT_2";  //队列名称
    private static final String EXCHANGE_NAME = "logs"; //交换器名称

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务器主机
        factory.setHost("127.0.0.1");
        //设置用户名
        factory.setUsername("guest");
        //设置密码
        factory.setPassword("guest");
        try {
            //创建连接
            Connection connection = factory.newConnection();
            //创建消息通道
            final Channel channel = connection.createChannel();
            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            //声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //绑定队列与交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
            //消息服务器每次只向消费者发送一条消息
            Consumer consumer = new DefaultConsumer(channel) {
                //重写DefaultConsumer中handleDelivery方法,在方法中获取消息
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    try {
                        //消息沉睡一秒
                        Thread.sleep(1000);
                        String message = new String(body, "UTF-8");
                        System.out.println("Consumer_FANOUT_1 收到消息 '" + message + "'");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println("Consumer_FANOUT_1 消息消费完成....");
                    }

                }
            };
            //监听消息
            channel.basicConsume(QUEUE_NAME, true, consumer);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
        }
    }
}

然后我们先运行两个接受这类,再运行提供者类, 结果显示, 两个接收者类都能接收到提供者类发布的消息

注意: 接收者类队列名字不能相同,如果想当将只有一个接受者类能接收到消息

DIRECT模式

 import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer_DIRECT {
    private static final String EXCHANGE_NAME = "exchange-routing";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务器主机
        factory.setHost("127.0.0.1");
        //设置用户名
        factory.setUsername("guest");
        //设置密码
        factory.setPassword("guest");

        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();

            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            String message = "Hello World!";
            //发送消息
            for (int i = 0; i < 10; i++) {
                if (i % 2 == 0) {
                    //发送消息,指定key
                    channel.basicPublish(EXCHANGE_NAME, "key2", null, (message + i).getBytes());
                    System.out.println(" 偶数消息 '" + message + i + "'");
                } else {
                    //发送消息
                    channel.basicPublish(EXCHANGE_NAME, "key1", null, (message + i).getBytes());
                    System.out.println(" 奇数消息 '" + message + i + "'");
                }
            }
        } catch (Exception e) {

        } finally {
            channel.close();
            connection.close();
        }
    }
}

import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer_DIRECT_1 {
    /**
     * 队列名字
     */
    private static String QUEUE_NAME = "queue1";
    private static final String EXCHANGE_NAME = "exchange-routing";

    public static void main(String[] args) {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务器主机
        factory.setHost("127.0.0.1");
        //设置用户名
        factory.setUsername("guest");
        //设置密码
        factory.setPassword("guest");
        Connection connection = null;
        try {
            //创建连接
            connection = factory.newConnection();
            //创建消息通道
            final Channel channel = connection.createChannel();
            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            //声明队列
            channel.queueDeclare(QUEUE_NAME,false, false, false, null);
            //绑定队列与交换机,指定接收的消息的key
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key1");
            //消息服务器每次只向消费者发送一条消息
            Consumer consumer = new DefaultConsumer(channel){
                //重写DefaultConsumer中handleDelivery方法,在方法中获取消息
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    try {
                        //消息沉睡一秒
                        Thread.sleep(1000);
                        String message = new String(body, "UTF-8");
                        System.out.println("consumer1 收到消息 '" + message + "'");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        System.out.println("consumer1 消息消费完成....");
                    }

                }
            };
            //监听消息
            channel.basicConsume(QUEUE_NAME, true,consumer);
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
        }
    }
}
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer_DIRECT_2 {
    /**
     * 队列名字
     */
    private static String QUEUE_NAME = "queue1";
    private static final String EXCHANGE_NAME = "exchange-routing";
    public static void main(String[] args){
        //创建连接工厂QUEUE_NAME
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务器主机
        factory.setHost("127.0.0.1");
        //设置用户名
        factory.setUsername("guest");
        //设置密码
        factory.setPassword("guest");
        Connection connection = null;
        try {
            //创建连接
            connection = factory.newConnection();
            //创建消息通道
            final Channel  channel = connection.createChannel();
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
            //声明队列
            channel.queueDeclare(QUEUE_NAME,false, false, false, null);
            //绑定队列与交换机,指定接收的消息的key
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2");
            Consumer consumer = new DefaultConsumer(channel){
                //重写DefaultConsumer中handleDelivery方法,在方法中获取消息
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException{
                    try {
                        //消息沉睡100ms
                        Thread.sleep(100);
                        String message = new String(body, "UTF-8");
                        System.out.println("consumer2 收到消息 '" + message + "'");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        System.out.println("consumer2 消息消费完成....");
                    }

                }
            };
            //监听消息,第二个参数为true时表示自动确认
            channel.basicConsume(QUEUE_NAME, true,consumer);
        }catch (Exception e) {
            e.printStackTrace();
        }finally {
        }
    }
}

输出的结果是


image.png
image.png
image.png

TOPIC模式

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer_TOPIC {
    private static final String EXCHANGE_NAME = "exchange-topic";
    public static void main(String[] args) throws IOException, TimeoutException {

        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务器主机
        factory.setHost("127.0.0.1");
        //设置用户名
        factory.setUsername("guest");
        //设置密码
        factory.setPassword("guest");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();

            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            String message = "Hello World!";
            //发送消息,绑定模式
            channel.basicPublish(EXCHANGE_NAME, "key.one", null, ("one -" + message).getBytes());
            channel.basicPublish(EXCHANGE_NAME, "key.two.msg", null, ("two -" + message).getBytes());
        }catch (Exception e) {

        }finally {
            channel.close();
            connection.close();
        }
    }
}

import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer_TOPIC_1 {
    /**
     * 队列名字
     */
    private static String QUEUE_NAME = "queue-topic_1";
    private static final String EXCHANGE_NAME = "exchange-topic";

    public static void main(String[] args) {

        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务器主机
        factory.setHost("127.0.0.1");
        //设置用户名
        factory.setUsername("guest");
        //设置密码
        factory.setPassword("guest");
        Connection connection = null;
        try {
            //创建连接
            connection = factory.newConnection();
            //创建消息通道
            final Channel channel = connection.createChannel();
            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            //声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //绑定队列与交换机,使用通配符key.* 表示只能匹配key下的一个路径,
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.*");
            //消息服务器每次只向消费者发送一条消息
//            channel.basicQos(1);
            Consumer consumer = new DefaultConsumer(channel) {
                //重写DefaultConsumer中handleDelivery方法,在方法中获取消息
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    try {
                        //消息沉睡一秒
                        Thread.sleep(1000);
                        String message = new String(body, "UTF-8");
                        System.out.println("consumer1 收到消息 '" + message + "'");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println("consumer1 消息消费完成....");
                    }

                }
            };
            //监听消息
            channel.basicConsume(QUEUE_NAME, true, consumer);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
        }
    }
}

import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer_TOPIC_2 {
    /**
     * 队列名字
     */
    private static String QUEUE_NAME = "queue-topic_1";
    private static final String EXCHANGE_NAME = "exchange-topic";

    public static void main(String[] args) {

        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务器主机
        factory.setHost("127.0.0.1");
        //设置用户名
        factory.setUsername("guest");
        //设置密码
        factory.setPassword("guest");
        Connection connection = null;
        try {
            //创建连接
            connection = factory.newConnection();
            //创建消息通道
            final Channel channel = connection.createChannel();
            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            //声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //绑定队列与交换机,使用通配符key.# 表示只能匹配key下的一个路径,
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.#");
            //消息服务器每次只向消费者发送一条消息
//            channel.basicQos(1);
            Consumer consumer = new DefaultConsumer(channel) {
                //重写DefaultConsumer中handleDelivery方法,在方法中获取消息
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    try {
                        //消息沉睡一秒
                        Thread.sleep(1000);
                        String message = new String(body, "UTF-8");
                        System.out.println("consumer2 收到消息 '" + message + "'");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println("consumer2 消息消费完成....");
                    }

                }
            };
            //监听消息
            channel.basicConsume(QUEUE_NAME, true, consumer);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
        }
    }
}

结果如下:

image.png
image.png

相关文章

  • 第三章,发布订阅demo

    新建一个名为rabbitmq-01的demo项目 因为我不是新建的maven项目,所以下载下列两个jar,amqp...

  • redis遇到的坑

    PUBSUB发布订阅模式的listen坑 demo 上面代码订阅者进入监听, 如果要停止监听在windows环境和...

  • Javaの设计模式之观察者模式

    推荐阅读:《设计模式之禅》 Observer Pattern 设计模式之观察者模式(发布订阅模式) Demo gi...

  • 创建对象的几种模式和一个事件管理器实现

    构造函数模式、混合模式、模块模式、工厂模式、单例模式、发布订阅模式的范例 . . . . . 具体实现 Demo[...

  • 三、云外云的谜团

    发布的第三章已被锁定,请大家移步个人订阅号“少年”(id:shaonian_01)阅读。

  • 如何手写一个简单发布订阅模式

    面试过程中很多面试官如何手写个发布订阅模式下面就是一个简单的demo

  • 发布-订阅

    参考 https://www.cnblogs.com/shenh/p/10497244.html 模式一:fan...

  • 发布订阅

    https://developer.mozilla.org/zh-CN/docs/Web/API/EventTar...

  • 发布订阅

    期望的数据类型{event: [fn1, fn2],}

  • 发布订阅

    //观察者模式和订阅发布模式的不同点在于,订阅发布模式 订阅者和发布者是解耦的,他们的关联是通过第三方来的//例子:

网友评论

    本文标题:第三章,发布订阅demo

    本文链接:https://www.haomeiwen.com/subject/sehicctx.html