zoukankan      html  css  js  c++  java
  • 从通信开始聊聊消息中间件

    一、系统间通信方式

    我们可以想到的方式:

    • 基于文件
    • 基于共享内存
    • 基于IPC
    • 基于Socket
    • 基于数据库
    • 基于RPC

    各个模式的缺点:

    • 文件:使用不方便,不及时
    • Socket:使用麻烦,多数情况下不如RPC
    • 数据库:不实时,但是经常有人拿数据库模拟消息队列
    • RPC:会导致调用关系复杂,同步处理,压力大的时候无法缓冲

    我们期望有一种通信方式:

    • 可以实现异步的消息通信
    • 可以简化参与各方的复杂依赖关系
    • 可以在请求量很大的时候缓冲一下
    • 某些情况下能保障消息的可靠性、顺序性

    这就是MQ(Message Queue),MQ满足了以上的所有特性。另外对比其他通信模式,MQ的优势还有:

    • 异步通信:异步通信,减少线程等待
    • 系统解耦:系统不直接调用,降低依赖
    • 削峰平谷:压力大的时候,缓冲部分请求消息
    • 可靠通信:提供多种消息模式、保证消息尽可能不丢失

    二、MQ的简单介绍

    从队列到消息服务

    一个队列:

    多个队列:

    常见的两种消息模式

    • 点对点模式:一个生产者对应一个消费者
    • 发布订阅模式:一个生产者对应多个消费者

    消息处理的保障

    三种QoS

    • At most once,至多一次,消息可能丢失但是不会重复发送
    • At least once,至少一次,消息不会丢失,但是可能会重复
    • Exactly once,精确一次,每天消息被传输有且仅有一次

    消息的有序性

    同一个Topic或Queue的消息,保障消息顺序投递

    注意:如果做了消息分区或者批量预取之类的操作,可能就没有顺序了

    消息协议

    1. STOMP :面向流文本的消息传输协议,是WebSocket通信标准。
    • 协议简单、易于实现。
    1. JMS:面向Java平台的标准消息传递API
    • 在JVM语言(如Scala)上具又互用性
    • 支持事务
    • 有queue和topic两种传递模型
    • 能够定义消息格式(消息头、属性、内容)
    • 无须担心底层协议
    1. AMQP

    JMS可以在任何的底层协议上运行,但是API是与编程语言绑定的。而AMQP能够支持两种不同的编程语言使用它来传递消息。

    • 独立于平台的底层消息传递协议
    • 跨语言和平台
    • 它是底层协议的
    • 支持事务、分布式事务
    • 有derect、fanout、topic、headers、system五种交换类型
    • 支持长周期消息传递
    • 使用SASL和TLS确保安全性
    1. MQTT : 专为小设备设计的,是物联网生态中主要成分之一。
    • 面向流,内存占用低
    • 低带宽下能发送消息。
    • 不允许分段消息(很难发送长消息)
    • 主持主题发布-订阅
    • 不支持事务
    • 不支持安全连接
    1. XMPP:一个开源形式组织产生的网络即时通信协议
    • 基于XML的协议
    • 简单的客户端,将大多数工作都放在服务端进行
    1. Open Messaging:由阿里、雅虎、滴滴等公司共同参与创立的分布式消息中间件开发标准
    • 结构简单
    • 解析速度快
    • 支持事务和持久化设计

    为什么消息中间件不使用http协议呢?

    1. 因为http请求报文和响应报文是比较复杂的,包含了cookie、状态码、加解密等等附加的功能,但是对于消息来说不需要这么复杂。
    2. 大部分情况下http是短连接,在实际的交互中,如果请求到响应的过程中中断了,中断之后就不会持久化,就会造成消息丢失。

    三、消息中间件

    三代消息中间件:

    1. ActiveMQ、RabbitMQ
    2. Kafka、RocketMQ
    3. Apache Pulsar

    1. ActiveMQ介绍

    • 高可靠、事务性的消息队列
    • 当前应用最广泛的开源消息中间件
    • 功能最全的开源消息队列

    主要功能:

    • 多种语言和协议编写客户端

    语言:Java、C、C++、C#、Ruby、Perl、Python、PHP
    协议:OpenWire、Stomp Rest、WS Notification、XMPP、AMQP、MQTT

    • 完全支持JMS1.1和J2EE 1.4规范
    • 与Spring很好的集成
    • 支持多种传送协议:TCP、SSL、NIO、UDP、JGroups、JXTA
    • 支持通过JDBC和journal提供高速的消息持久化
    • 实现了高性能的集群模式

    使用场景:

    1. 所有需要使用消息队列的地方
    2. 订单处理、消息通知、服务降级等
    3. 纯java实现,可以支持嵌入到应用系统

    使用教程

    1. activemq下载链接;http://archive.apache.org/dist/activemq/

    2. 下载最新版解压,然后进入apache-activemq-5.16.3-binapache-activemq-5.16.3inwin64目录,点击activemq.bat启动。

    如果出现端口占用,可以conf/activemq.xml修改端口。

    1. 引入依赖:
    <dependency>
    			<groupId>org.apache.activemq</groupId>
    			<artifactId>activemq-core</artifactId>
    			<version>5.7.0</version>
     </dependency>
    
    1. 点对点
    package com.mmc.springbootstudy.activemq.p2p;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class Producer {
    
        public static void main(String[] args) throws JMSException {
            ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                    ActiveMQConnection.DEFAULT_PASSWORD,"tcp://0.0.0.0:61616");
            Connection connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            Queue queue = session.createQueue("test-queue");
            //生产者
            MessageProducer producer = session.createProducer(queue);
            //设置不持久化
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            //发送消息
            for (int i = 0; i < 1; i++) {
                sendMsg(session,producer,i);
            }
            session.commit();
            connection.close();
        }
    
        private static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {
            TextMessage textMessage = session.createTextMessage("Hello ActiveMQ " + i);
            producer.send(textMessage);
        }
    }
    
    
    package com.mmc.springbootstudy.activemq.p2p;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * 接收者
     */
    public class JmsReceiver {
    
        public static void main(String[] args) throws JMSException {
            ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                    ActiveMQConnection.DEFAULT_PASSWORD,"tcp://0.0.0.0:61616");
            Connection connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            Queue queue = session.createQueue("test-queue");
            MessageConsumer consumer = session.createConsumer(queue);
            while (true){
                TextMessage receive = (TextMessage) consumer.receive();
                System.out.println(receive.getText());
                receive.acknowledge();
            }
    
        }
    }
    
    
    1. 发布-订阅
    public class TopSend {
    
        public static void main(String[] args) throws JMSException {
            ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                    ActiveMQConnection.DEFAULT_PASSWORD,"tcp://0.0.0.0:61616");
            Connection connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            MessageProducer producer = session.createProducer(null);
            TextMessage textMessage = session.createTextMessage("hello,topic");
            Topic topic = session.createTopic("test-topic");
            producer.send(topic,textMessage);
        }
    }
    
    public class TopReceiver {
    
        public static void main(String[] args) throws JMSException {
            ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                    ActiveMQConnection.DEFAULT_PASSWORD,"tcp://0.0.0.0:61616");
            Connection connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            Topic topic = session.createTopic("test-topic");
            MessageConsumer consumer = session.createConsumer(topic);
            while (true){
                TextMessage receive = (TextMessage) consumer.receive();
                System.out.println(receive.getText());
            }
    
        }
    }
    

    或者接受消息用监听的方式。

    public class TopReceiver {
    
        public static void main(String[] args) throws JMSException {
            ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                    ActiveMQConnection.DEFAULT_PASSWORD,"tcp://0.0.0.0:61616");
            Connection connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            Topic topic = session.createTopic("test-topic");
            MessageConsumer consumer = session.createConsumer(topic);
            consumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        System.out.println(((TextMessage)message).getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
    
        }
    }
    

    未完。。。下篇继续讲kafka

    书山有路勤为径,学海无涯苦作舟
  • 相关阅读:
    手动制作openstack windows镜像
    kolla-ansible部署多节点OpenStack-Pike
    手动制作openstack CentOS 镜像
    VNC Viewer连接打开remote display的VMware虚拟机出现闪退
    Host does not support domain type kvm for virtualization type 'hvm' arch 'x86_64'
    ImportError: No module named 'requests.packages.urllib3'
    使用kolla安装的openstack mariadb为集群所有节点无法启动
    kolla-ansible部署单节点OpenStack-Pike
    墨刀--在线原型设计工具
    Pycharm配置支持vue语法
  • 原文地址:https://www.cnblogs.com/javammc/p/15205196.html
Copyright © 2011-2022 走看看