zoukankan      html  css  js  c++  java
  • MQ(队列消息的入门)

      消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成,通过提供消息传递消息排队模型,它可以在分布式环境下拓展进程间的通信,对于消息中间件,常见的角色大致也就有Producer(生产者).Consumer(消费者)

    MQ     消息中间件     消息队列

    Message Queue简称MQ

    种类:

    1.Apache  ActiveMQ  

         ActiveMQ是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现.我们在本次课程中介绍ActiveMQ的使用.

    2.阿里  RocketMQ

      

    3.Pivotal 开发RabbitMQ

      AMQP协议的领导实现,支持多种场景.淘宝的Mysql集群内部有使用它进行通讯,OpenStack开源云平台的通信组件,最先在金融行业得到运用.

    ZeroMQ

      史上最快的消息队列系统.

    kafka:

      Apache下的一个子项目.特点:高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统.适合处理海量数据.

    使用场景(为什么使用MQ?):

    //TODO

    JMS简介

    什么是JMS?

        JMS(Java Messaging Service)是java平台上有关面向消息中间件的技术规范(可以使用jmsTemplate),它便于消息系统中的java应用程序进行消息交换,并且通过提供标准的产生,发送,接收消息的接口库简化企业应用的开发.

      JMS本身只定义了一系列的接口规范,是一种与厂商无关的API.用来访问消息收发系统.它类似于JDBC(java Database Connectivity);这里,JDBC是可以用来访问许多不同关系数据库的的API.而JMS则提供同样与厂商无关的访问方法,以访问消息收发服务.许多厂商目前都支持JMS,包括IBM的MQseries.BEA的Weblogic.JMSservice和Progress的SonicMQ,这只是几个例子.JMS使您能够通过消息收发服务(有时称为消息中介程序或者路由器)从一个JMS客户机向另一个JMS客户机发送消息,消息是JMS中的一种类型对象,由两部分组成:报头消息主体.

    报头由路由信息以及有关该消息的元数据组成.

    消息主体则携带着应用程序的数据或有效负载.

      JMS定义了五种不同的消息正文格式,以及调用的消息类型.允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性.

      TextMessage ---一个字符串对象

      MapMessage---一套名称-值对

      ObjectMessage--一个序列化的java对象

      ByteMessage -- 一个字节的数据流

      StreamMessage --Java原始值的数据流

    JMS消息传递类型

      对于消息的传递有两种类型:

        一种是点对点,即一个生产者和一个消费者一一对应.

        

        另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收.

    JMS入门小Demo

    现在是点对点模式:

      点对点的模式主要建立在一个队列上面,当连接一个列队的时候,发送端不需要知道接收端是否正在接收,可以直接行ActiveMQ发送消息,发送的消息,将会先进入队列中,如果有接收端在监听,则会发现接收端,如果没有接收端接收,多个接收端,但是一条消息,只会被一个接收端给接收到,那个接收端先连上ActiveMQ,则会先接收到,而后来的接收端则接收不到那条消息.

    创建的是一个没有使用任何骨架的java工程

    引入的依赖为:

       <dependencies>
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-client</artifactId>
                <version>5.13.4</version>
            </dependency>
        </dependencies>
        <build>
            <plugins>
                <!-- java编译插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.2</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <encoding>UTF-8</encoding>
                    </configuration>
                </plugin>
            </plugins>
        </build>

       代码

    /**
     * @Auther:qingmu
     * @Description:脚踏实地,只为出人头地
     * @Date:Created in 16:16 2019/4/23
     */
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * 点对点模式
     * 生产者
     */
    
    public class QueueProducer {
        public static void main(String[] args) throws Exception {
            //1.创建连接工厂
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
            //2.获取连接
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
            /**
             *  AUTO_ACKNOWLEDGE = 1    自动确认
             •    CLIENT_ACKNOWLEDGE = 2    客户端手动确认
             •    DUPS_OK_ACKNOWLEDGE = 3    自动批量确认
             •    SESSION_TRANSACTED = 0    事务提交并确认
    
             */
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建队列对象
            Queue queue = session.createQueue("test-queue");
            //6.创建消息生产者
            MessageProducer producer = session.createProducer(queue);
            //7.创建消息
            TextMessage textMessage = session.createTextMessage("欢迎来到神奇的");
            //8.发送消息
            producer.send(textMessage);
            //9.关闭资源
            producer.close();
            session.close();
            connection.close();
    
        }
    }

    直接运行这个main方法,后可以mq中查看到:

    消费者的代码为:

    /**
     * @Auther:qingmu
     * @Description:脚踏实地,只为出人头地
     * @Date:Created in 16:23 2019/4/23
     */
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * 点对点模式
     * 消息消费者
     */
    public class QueueConsumer {
        public static void main(String[] args) throws Exception{
            //1.创建连接工厂
            ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
            //2.获取连接
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建队列对象
            Queue queue = session.createQueue("test-queue");
            //6.创建消息消费
            MessageConsumer consumer = session.createConsumer(queue);
    
            //7.监听消息
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage textMessage=(TextMessage)message;
                    try {
                        System.out.println("接收到消息:"+textMessage.getText());
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            });
            //8.等待键盘输入
            System.in.read();
            //9.关闭资源
            consumer.close();
            session.close();
            connection.close();
    
        }
    }

    通过上面定义的监听器,可以获取到生产者生产的信息.在控制台上

    在进行测试的时候,开启两个以上的消费者,开启一个生产者,然后可以观察到只能在一个消费者的控制台上进行显示,而在另一个消费者的控制台上不能进行打印.

    发布和订阅者模式

    消费生产者:

    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * @Auther:qingmu
     * @Description:脚踏实地,只为出人头地
     * @Date:Created in 16:32 2019/4/23
     */
    public class TopicProducer {
        public static void main(String[] args) throws Exception{
            //1.创建连接工厂
            ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
            //2.获取连接
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建主题对象
            Topic topic = session.createTopic("test-topic");
            //6.创建消息生产者
            MessageProducer producer = session.createProducer(topic);
            //7.创建消息
            TextMessage textMessage = session.createTextMessage("欢迎来到神奇的世界");
            //8.发送消息
            producer.send(textMessage);
            //9.关闭资源
            producer.close();
            session.close();
            connection.close();
    
        }
    }

     运行完以后的结果为:

     

    消费者为:

    /**
     * @Auther:qingmu
     * @Description:脚踏实地,只为出人头地
     * @Date:Created in 16:36 2019/4/23
     */
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * 订阅
     * 多对多
     */
    public class TopicConsumer {
        public static void main(String[] args) throws Exception{
            //1.创建连接工厂
            ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
            //2.获取连接
            Connection connection = connectionFactory.createConnection();
            //3.启动连接
            connection.start();
            //4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5.创建主题对象
            //Queue queue = session.createQueue("test-queue");
            Topic topic = session.createTopic("test-topic");
            //6.创建消息消费
            MessageConsumer consumer = session.createConsumer(topic);
    
            //7.监听消息
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage textMessage=(TextMessage)message;
                    try {
                        System.out.println("接收到消息:"+textMessage.getText());
                    } catch (JMSException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            });
            //8.等待键盘输入
            System.in.read();
            //9.关闭资源
            consumer.close();
            session.close();
            connection.close();
    
        }
    }

    运行测试:

      同时开启2个以上的消费者,再次运行生产者,观察每一个消费者控制台的输出,会发现每个消费者会接收到消息.

  • 相关阅读:
    好用的抓取dump的工具-ProcDump
    好用的抓取dump的工具-ProcDump
    使用Windbg调试系统弹出的内存不可读错误
    使用Windbg调试系统弹出的内存不可读错误
    驱动层hook系统函数的时,如何屏蔽掉只读属性?
    Rootkit之SSDT hook(通过CR0)
    Rootkit之SSDT hook(通过CR0)
    #define SYSTEMSERVICE(_func) KeServiceDescriptorTable.ServiceTableBase[ *(PULONG)((PUCHAR)_func+1) 这
    华南理工大学“三七互娱杯”程序设计竞赛(重现赛)B HRY and fibonacci
    华南理工大学“三七互娱杯”程序设计竞赛(重现赛)A HRY and codefire
  • 原文地址:https://www.cnblogs.com/qingmuchuanqi48/p/10755340.html
Copyright © 2011-2022 走看看