美文网首页
消息发送基本类型

消息发送基本类型

作者: 洛美萨斯 | 来源:发表于2020-02-24 00:37 被阅读0次

1. 步骤分析

  • 导入MQ客户端依赖
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.3.0</version>
</dependency>
  • 消息发送者步骤分析
1.创建消息生产者producer,并制定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象,指定主题Topic、Tag和消息体
5.发送消息
6.关闭生产者producer
  • 消息消费者步骤分析
1.创建消费者Consumer,制定消费者组名
2.指定Nameserver地址
3.订阅主题Topic和Tag
4.设置回调函数,处理消息
5.启动消费者consumer

2. 基本样例

2.1 消息发送

1) 发送同步消息

这种可靠性同步地发送方式使用比较广发,比如:重要的消息通知,短信通知。

public class SyncProducerTest {
    public static void main(String[] args) throws Exception {
        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2.指定Nameserver地址
        producer.setNamesrvAddr("192.168.52.139:9876;192.168.52.140:9876");
        //3.启动producer
        producer.start();
        for (int i = 0; i < 10; i++) {
            //4.创建消息对象,指定主题Topic、Tag和消息体
            /**
             * 参数1:消息主题Topic
             * 参数2:消息Tag
             * 参数3:消息内容
             */
            Message msg = new Message("base", "Tag1", ("Hello World!+ " + i).getBytes());
            //5.发送消息
            SendResult result = producer.send(msg);
            //发送状态
            SendStatus staus = result.getSendStatus();
            //消息ID
            String msgId = result.getMsgId();
            //消息接收队列ID
            int queueId = result.getMessageQueue().getQueueId();
            System.out.println("发送状态:" + staus + ",消息ID:" + msgId + ",队列:" + queueId);

            System.out.println("发送结果:" + result);
            //线程睡一秒
            TimeUnit.MILLISECONDS.sleep(100);
        }
        //6.关闭生产者producer
        producer.shutdown();
    }
}

2)发送异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

public class AsyncProducerTest {
    public static void main(String[] args) throws Exception {
        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2.指定Nameserver地址
        producer.setNamesrvAddr("192.168.52.139:9876;192.168.52.140:9876");
        //3.启动producer
        producer.start();
        for (int i = 0; i < 10; i++) {
            //4.创建消息对象,指定主题Topic、Tag和消息体
            /**
             * 参数1:消息主题Topic
             * 参数2:消息Tag
             * 参数3:消息内容
             */
            Message msg = new Message("base", "Tag2", ("Hello World!+ " + i).getBytes());
            //5.发送消息
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult result) {
                    //发送状态
                    SendStatus staus = result.getSendStatus();
                    //消息ID
                    String msgId = result.getMsgId();
                    //消息接收队列ID
                    int queueId = result.getMessageQueue().getQueueId();
                    System.out.println("发送状态:" + staus + ",消息ID:" + msgId + ",队列:" + queueId);

                    System.out.println("发送结果:" + result);

                }

                @Override
                public void onException(Throwable e) {
                    e.printStackTrace();
                }
            });
            //线程睡一秒
            TimeUnit.MILLISECONDS.sleep(100);
        }
        //6.关闭生产者producer
        producer.shutdown();
    }

3) 单向发送消息

这种方式主要用在不特别关心发送结果的场景,比如,日志发送

public class OnewayProducerTest {
    public static void main(String[] args) throws Exception {
        //1.创建消息生产者producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        //2.指定Nameserver地址
        producer.setNamesrvAddr("192.168.52.139:9876;192.168.52.140:9876");
        //3.启动producer
        producer.start();
        for (int i = 0; i < 100; i++) {
            //4.创建消息对象,指定主题Topic、Tag和消息体
            /**
             * 参数1:消息主题Topic
             * 参数2:消息Tag
             * 参数3:消息内容
             */
            Message msg = new Message("base", "Tag3", ("Hello World!+单向消息 " + i).getBytes());
            //5.发送消息
            producer.sendOneway(msg);
            // 睡眠100ms
            TimeUnit.MILLISECONDS.sleep(100);
        }
        //6.关闭生产者producer
        producer.shutdown();
    }
}

2.2 消费消息

1)负载均衡模式

消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同。

public class ClusterConsumerTest {
    public static void main(String[] args) throws Exception {
        //1.创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer comsumer = new DefaultMQPushConsumer("group1");
        //2.指定Nameserver地址
        comsumer.setNamesrvAddr("192.168.52.139:9876;192.168.52.140:9876");
        //3.订阅主题Topic和Tag
        comsumer.subscribe("base", "Tag2");
        //设置负载均衡消费(默认模式)
        comsumer.setMessageModel(MessageModel.CLUSTERING);
        //4.设置回调函数,处理消息
        comsumer.registerMessageListener(new MessageListenerConcurrently() {
            //接受消息内容
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动消费者consumer
        comsumer.start();
    }
}

2) 广播模式

消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的。

public class BroadcastConsumerTest {
    public static void main(String[] args) throws Exception {
        //1.创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer comsumer = new DefaultMQPushConsumer("group1");
        //2.指定Nameserver地址
        comsumer.setNamesrvAddr("192.168.52.139:9876;192.168.52.140:9876");
        //3.订阅主题Topic和Tag
        comsumer.subscribe("base", "Tag1");
        //广播模式消费
        comsumer.setMessageModel(MessageModel.BROADCASTING);
        //4.设置回调函数,处理消息
        comsumer.registerMessageListener(new MessageListenerConcurrently() {
            //接受消息内容
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5.启动消费者consumer
        comsumer.start();
    }
}

相关文章

  • 49.消息类型-普通消息

    发送不同类型的消息 普通消息 RocketMQ提供三种方式来发送普通消息:可靠同步发送,可靠异步发送和单向发送。 ...

  • 视频直播

    LeanClound消息类型自定义 主要矛盾在于过去接收和发送的消息类型与现在接收和发送消息类型完全不一样,次要矛...

  • 4、RocketMQ基础-消息发送样例

    消息发送样例 导入MQ客户端依赖 2、* 消息发送者步骤分析 消息消费者步骤分析 基本样例 消息发送 发送同步消息...

  • 设计模式系列11--桥接模式

    假设要实现一个给客户发送提示消息的功能,发送的消息类型可分为:普通消息、加急消息、特加急消息等等,而每种消息的发送...

  • ActiviteMQ接收和发送消息基本流程

    ActiviteMQ接收和发送消息基本流程 发送消息的基本步骤: (1)、创建连接使用的工厂类JMS Connec...

  • roketmq-4.x官方文档-基本样例

    1 基本样例来源 :官方文档 在基本样例中我们提供如下的功能场景: 使用RocketMQ发送三种类型的消息:同步消...

  • 第14条:理解“类对象”的用意

    给接受者发送消息,应该指明消息接受者的具体类型,这样向其发送无法解读的消息,编译器就会产生警告,而类型为id的对象...

  • Android-Handler发送消息

    Handler 发送消息 android中使用handler发送异步消息刷新UI是最基本的知识点,但如何优雅发送一...

  • 阿里云rocketMQ接入

    核心概念 Topic:消息主题,一级消息类型,生产者向其发送消息。 生产者:也称为消息发布者,负责生产并发送消息至...

  • Kafka使用笔记(二、生产者详解)

    消息发送类型 发送即忘记 同步发送 异步发送 序列化器 消息要到网络上进行传输,必须进行序列化,而序列化器的作用就...

网友评论

      本文标题:消息发送基本类型

      本文链接:https://www.haomeiwen.com/subject/ykscfhtx.html