ActiveMQ

作者: lhsjohn | 来源:发表于2019-03-03 23:08 被阅读0次

What is ActiveMQ?

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
主要特点:

  1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
  2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
  3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
  4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
  5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
  6. 支持通过JDBC和journal提供高速的消息持久化
  7. 从设计上保证了高性能的集群,客户端-服务器,点对点
  8. 支持Ajax
  9. 支持与Axis的整合
  10. 可以很容易得调用内嵌JMS provider,进行测试

ActiveMQ的消息形式

对于消息的传递有两种类型:

一种是点对点的,即一个生产者和一个消费者一一对应;

另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。

JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。

  1. · StreamMessage -- Java原始值的数据流
  2. · MapMessage--一套名称-值对
  3. · TextMessage--一个字符串对象
  4. · ObjectMessage--一个序列化的 Java对象
  5. · BytesMessage--一个字节的数据流
a.png

使用测试(未整合Spring)


public void testQueueProducer() throws Exception {
        // 1.创建一个连接工厂对象,需要指定服务的ip以及端口
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.128:61616");
        // 2.使用工厂对象来创建一个Connection对象
        Connection connection = connectionFactory.createConnection();
        // 3.开启连接,调用Connection对象的start方法
        connection.start();
        // 4.创建一个Session对象
        // 第一个参数:是否开启事务。如果true开启事务,第二个无意义。一般不开启false
        // 第二个参数:如果不开启事务。应答模式,一般是自动应答或者手动应答
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5.使用Session对象创建一个Destination对象。两种形式queue、topic
        Queue queue = session.createQueue("spring-queue");
        // 6.使用Session对象创建一个Producer对象
        MessageProducer producer = session.createProducer(queue);
        // 7.创建一个Message对象,可以使用TextMessage。
        /*
         * TextMessage textMessage=new ActiveMQTextMessage();
         * textMessage.setText("hello Activemq");
         */
        TextMessage textMessage = session.createTextMessage("hello Activemq");
        // 8.发送消息
        producer.send(textMessage);
        // 9.关闭资源
        producer.close();
        session.close();
        connection.close();

    }


    public void testQueueConsumer() throws Exception {
        // 创建一个ConnectionFactory对象连接MQ服务器
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.128:61616");
        // 创建一个连接对象
        Connection connection = connectionFactory.createConnection();
        // 开启连接
        connection.start();
        // 使用Connection对象创建一个Session对象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建一个Destinatiion对象,queue对象
        Queue queue = session.createQueue("spring-queue");
        // 使用Session对象创建一个消费者对象
        MessageConsumer consumer = session.createConsumer(queue);
        // 接收消息
        consumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                // 打印结果
                TextMessage textMessage = (TextMessage) message;
                String text;

                try {
                    text = textMessage.getText();
                    System.out.println(text);
                } catch (JMSException e) {

                    e.printStackTrace();
                }
            }
        });

        // 等待接收消息
        System.in.read();
        // 关闭资源
        consumer.close();
        session.close();
        connection.close();
    }

    
    public void testTopicProducer() throws Exception {
        // 1.创建一个连接工厂对象,需要指定服务的ip以及端口
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.128:61616");
        // 2.使用工厂对象来创建一个Connection对象
        Connection connection = connectionFactory.createConnection();
        // 3.开启连接,调用Connection对象的start方法
        connection.start();
        // 4.创建一个Session对象
        // 第一个参数:是否开启事务。如果true开启事务,第二个无意义。一般不开启false
        // 第二个参数:如果不开启事务。应答模式,一般是自动应答或者手动应答
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5.使用Session对象创建一个Destination对象。两种形式queue、topic
        Topic topic = session.createTopic("test-topic");
        // 6.使用Session对象创建一个Producer对象
        MessageProducer producer = session.createProducer(topic);
        // 7.创建一个Message对象,可以使用TextMessage。
        /*
         * TextMessage textMessage=new ActiveMQTextMessage();
         * textMessage.setText("hello Activemq");
         */
        TextMessage textMessage = session.createTextMessage("topicMessage");
        // 8.发送消息
        producer.send(textMessage);
        // 9.关闭资源
        producer.close();
        session.close();
        connection.close();
    }


    public void testTopicConsumer() throws Exception {
        // 创建一个ConnectionFactory对象连接MQ服务器
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.128:61616");
        // 创建一个连接对象
        Connection connection = connectionFactory.createConnection();
        // 开启连接
        connection.start();
        // 使用Connection对象创建一个Session对象
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建一个Destinatiion对象,queue对象
        Topic topic = session.createTopic("test-topic");
        // 使用Session对象创建一个消费者对象
        MessageConsumer consumer = session.createConsumer(topic);
        // 接收消息
        consumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                // 打印结果
                TextMessage textMessage = (TextMessage) message;
                String text;

                try {
                    text = textMessage.getText();
                    System.out.println(text);
                } catch (JMSException e) {

                    e.printStackTrace();
                }
            }
        });
      
        System.out.println("topic 消费者3已经启动");
        // 等待接收消息
        System.in.read();
        // 关闭资源
        consumer.close();
        session.close();
        connection.close();
    }



作者:lhsjohn

相关文章

网友评论

      本文标题:ActiveMQ

      本文链接:https://www.haomeiwen.com/subject/cssvuqtx.html