zoukankan      html  css  js  c++  java
  • 消息中间件 JMS

    一、什么是?

      JMS : Java Message Service(Java消息服务),Java平台中关于面向消息中间件的接口

      重点在于接口,接口就意味着与JDBC类似,仅仅有声明,没有实现,具体的实现交给厂商. 接口本身是一种与厂商无关的API.

      

      支持的消息类型

      TextMessage 字符串对象

      MapMessage 名值对

      ObjectMessage 一个序列化的java对象

      BytesMessage 字节的数据流

      StreamMessage 原始数据值的数据流

     

      作用

      在soa 分布式架构系统中 或者企业多个项目中 进行多个系统异步传输消息

      优点 使用消息服务器 当做大一点的队列使用 先进先出 处理高并发写入问题,似的=业务由串行改为并行 处理效率高

      缺点:实时性不高 因为数据发送给消息服务器 消息服务器不会立刻处理 队列是先进先出,如果数据量大需要排队等候

     

    二、使用场景

    每一种技术都有其产生的原因和解决的问题,消息服务的使用场景如下:

      a:大的队列使用 先进先出 来处理高并发问题

      b:业务系统的串行改为并行 处理效率高

    • 核心应用:
      • 解耦: 订单 --> 物流 (降低工程间的强依赖程度)
      • 异步: 用户注册 --> 发送邮件,初始化信息 (耗时长非必须的业务抽离,放在空闲的时候处理)
      • 流量削峰: 秒杀,日志处理 (针对突发情况的大访问量场景)
    • 跨平台, 多语言
    • 分布式事务, 最终一致性 (引入和分布式和微服务之后,事务的处理和数据的一致是比较难保证的一个方面)
    • RPC调用上下游对接,数据源变动通知下属

    三、消息中间件常见概念与编程模型

    常见概念

    • JMS提供者: 连接面向消息中间件的,也就是JMS的接口实现, RocketMQ, ActiveMQ等. 简单的来说就是JMS接口的实现厂商提供的实现, 中间件本身.
    • Broker服务器: 接收生产消息,提供给消费者消费的程序
    • JMS生产者(Message Producer): 生产消息的服务
    • JMS消费者(Message Consumer): 消费消息的服务
    • JMS消息: 数据对象, 传递的内容
    • JMS队列: 存储待消费信息的区域(可以简单的理解为数据结构中的队列,可以保证顺序,存储消息的介质)
    • JMS主题(Topic): 一种支持发送消息给多个订阅者的机制(中转,不同的生产者发送消息到主题,经过服务器分发到不同的订阅者)
    • JMS消息通常的两种类型: 点对点(Point-to-Point), 发布/订阅(Publish/Subscribe). 区别在于前者是多个消费者但只有一个消费者可以消费消息,后者是多个消费者都可以同时消费一个消息.

    一个消息对应一个主题,一个主题下由多个queue. 要注意的是topic是一个逻辑管理单位,queue是物理管理单位

    基础编程模型

    • MQ中需要用到的一些类
      • ConnectionFactory: 连接工厂, JMS使用它来创建连接
      • Connection: JMS客户端到JMS Provider的连接
      • Session: 一个发送或接收消息的线程
      • Destination: 消息的目的地; 消息发送给谁(Broker)
      • MessageConsumer / MessageProducer: 消息消费者 / 消息生产者

     

     

    四、同类技术对比

      • ActiveMQ: 
        • 支持多种语言的客户端和协议,基于JMS Provider的实现,容易上手. 但是吞吐量不高,多队列的时候性能会下降,存在消息丢失情况.
      • Kafka: 
        • 由Scala和Java编写,可以处理大规模的网站中的所有动作流数据(网页浏览,搜索等),保证数据尽量不丢失,适用于大数据领域(电商等). 但是不支持批量个广播消息,运维难度大,文档少,需要掌握Scala
      • RabbitMQ: 

      • 开源的AMQP实现,服务端用Erlang语言编写,支持多种客户端,在易用性,扩展性和高可用性等方面表现不错. 但是使用Erlang语言开发,阅读和修改源码难度大.(当然公司如果大部分人熟悉Erlang语言的话例外)安全性高
    • ZeroMQ: (最快)

    五、小Demo

    pom.xml

    <dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-client</artifactId>
            <version>5.13.4</version>
        </dependency>
    </dependencies>
    

      

    点对点模式

    接收方

    package cn.liuhuan.core;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * 接收方
     */
    public class QueueConsuer {
    
        public static void main(String[] args) throws Exception {
            //1  创建连接工厂
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
            //2 获得链接
            Connection connection = connectionFactory.createConnection();
            //3 启动连接
            connection.start();
            //4 获取session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5 创建队列
            Queue queue = session.createQueue("test01");
            //6 创建消息
            MessageConsumer consumer = session.createConsumer(queue);
            //7  监听消息
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println(textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            //8  等待键盘输入
            System.in.read();
            //9 关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    
    }

    发送方

    package cn.liuhuan.core;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * 发送方
     */
    public class QueueProducer {
        public static void main(String[] args) throws Exception {
            //1  创建连接工厂
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
            //2 获得链接
            Connection connection = connectionFactory.createConnection();
            //3 启动连接
            connection.start();
            //4 获取session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5 创建队列
            Queue queue = session.createQueue("test01");
            //6 创建消息
            MessageProducer producer = session.createProducer(queue);
            //7  生产消息
            TextMessage textMessage = session.createTextMessage("这是收到的第一个消息");
            //8  发送消息
            producer.send(textMessage);
            //9 关闭资源
            producer.close();
            session.close();
            connection.close();
        }
    }
    

      

    订阅发布模式

    接收方(多个都是一样的)

    package cn.liuhuan.core;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class TopicConsuer1 {
        public static void main(String[] args) throws Exception {
            //1  创建连接工厂
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
            //2 获得链接
            Connection connection = connectionFactory.createConnection();
            //3 启动连接
            connection.start();
            //4 获取session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5 创建队列
            Topic topic = session.createTopic("testTopic");
            //6 创建消息
            MessageConsumer consumer = session.createConsumer(topic);
            //7  监听消息
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println(textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            //8  等待键盘输入
            System.in.read();
            //9 关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    }
    

      

    发送方

    package cn.liuhuan.core;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * 发送方
     */
    public class TopicProducer {
        public static void main(String[] args) throws Exception {
            //1  创建连接工厂
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.200.128:61616");
            //2 获得链接
            Connection connection = connectionFactory.createConnection();
            //3 启动连接
            connection.start();
            //4 获取session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //5 创建队列
            Topic topic = session.createTopic("testTopic");
            //6 创建消息
            MessageProducer producer = session.createProducer(topic);
            //7  生产消息
            TextMessage textMessage = session.createTextMessage("这是收到的第一个消息");
            //8  发送消息
            producer.send(textMessage);
            //9 关闭资源
            producer.close();
            session.close();
            connection.close();
        }
    }
    

      

  • 相关阅读:
    C语言数据结构链表
    Servlet中对上传的图片进行大小变换
    网页中有几个框架,在其中一个框架中点击超链接刷新整个页面
    来园子开博了
    学习《java编程思想》导入作者的net.mindview包
    git常用命令汇总
    安装lessloader后,编译项目报错TypeError: this.getOptions is not a function
    数组学习二
    常见文件管理命令
    (转载)Shell语法
  • 原文地址:https://www.cnblogs.com/shiliuhuanya/p/12123597.html
Copyright © 2011-2022 走看看