zoukankan      html  css  js  c++  java
  • 消息中间件

    1、什么是消息中间件?

          关注于数据的发送和接收,利用高效可靠地异步消息传递机制集成分布式系统。

    2、什么是JMS? java - api

          Java 消息服务(Java Message Service)即JMS, 是一个Java 平台(不能跨语言)中关于面向消息中间件的API,用户两个应用程序之间,或分布式系统中发送消息,进行异步通信。

    3、AMQP协议(为了跨语言) Wire-protocaol, 只支持 byte[] 二进制的消息类型

       AMQP(advanced message queuing protocol) 是一个提供统一消息服务的应用层标准协议,给予此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。

    4、常用的消息中间件

          <1> ActiveMQ 完全支持JMS1.1与J2EE 1.4规范的JMS Provider实现。

                 支持多种语言 Java c c++ c# Ruby Perl Python PHP,支持的应用协议:OpenWire, Stomp REST, WS Notification, Xmpp, AMQP  (没有JMS 因为不是应用协议,只是开发规范)。

          <2> RabbitMQ  是一个开源的AMQP实现,服务器端使用Erlang语言编写,用于在分布式系统中存储转发消息,在易用性和扩展性 高可用等方便变现不俗

                 支持多种客户端, 如 Python Ruby .Net  java jms c php等

          <3> kafka  是一种高吞吐量的分布式发布订阅消息系统,是一个分布式的,分区的,可靠地分布式日志存储服务,它通过一种独一无二的设计提供了一个消息系统的功能。不是一个严格的消息中间件,

         主要是用来做日志储存。即使是非常普通的硬件kafka也可以支持每秒数百万的消息。

         比对:

         

    5、消息模式

       主题和队列两种模式。

          队列模型: 客户端包含生产者和消费者

                             队列中的消息只能被一个消费者消费

                             消费者可以随时消费队列中的消息

         主题模型:  客户端包括发布者和订阅者

             主题中的消息被所有的订阅者消费

             消费者不能消费订阅之前就发送到主题中的消息

    6、JMS 编码接口之间的关系

    7、队列模式

    在linux上安装activemq部分此处省略

    安装完后打开activemq平台,访问地址http://192.168.37.128:8161,登录账号和密码都是admin

    下面我们来创建一个生产者:

    public class AppProducer {
    
    	private static final String URL = "tcp://192.168.37.128:61616";
    	private static final String QUEUE_NAME = "queue-test";
    	
    	public static void main(String[] args) throws Exception{
    	   
    		// 创建连接工厂
    		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
    		// 建立连接
    		Connection connection = connectionFactory.createConnection();
    		// 启动连接
    		connection.start();
    		// 创建会话
    		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    		// 创建一个目标
    		Destination destination = session.createQueue(QUEUE_NAME);
    		// 创建一个生产者
    		MessageProducer producer = session.createProducer(destination);
    		// 发送消息
    		for(int i=0; i < 100; i++){
    			// 创建消息
    			TextMessage textMessage = session.createTextMessage("test" + i);
    			// 发送消息
    			producer.send(textMessage);
                System.out.println("send" + textMessage.getText());					
    		}
    		// 关闭连接
    		connection.close();
    	
    	}
    }

     运行成功后,我们查看activemq平台,发现队列中有100个待消费的消息:

    ok, 下面我们来创建一个消费者:

    public class AppConsumer {
    
    	private static final String URL = "tcp://192.168.37.128:61616";
    	private static final String QUEUE_NAME = "queue-test";
    	
    	public static void main(String[] args) throws Exception{
    	   
    		// 创建连接工厂
    		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
    		// 建立连接
    		Connection connection = connectionFactory.createConnection();
    		// 启动连接
    		connection.start();
    		// 创建会话
    		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    		// 创建一个目标
    		Destination destination = session.createQueue(QUEUE_NAME);
    		// 创建一个消费者
    		MessageConsumer consumer = session.createConsumer(destination);
    		// 消费消息
    		consumer.setMessageListener(new MessageListener() {
    			
    			@Override
    			public void onMessage(Message arg0) {
    				TextMessage textMessage = (TextMessage)arg0;
    				System.out.println("receive" + textMessage);
    				
    			}
    		});
    	
    	}
    }
    

    运行成功后,我们查看activemq平台,发现队列中100个待消费的消息全部被消费:

    8 主题模式  主题模式下,消费者必须要比订阅者先启动,否则不会收到消息。

       主题模式和队列模式,代码机会相同,只需要在创建目标的时候改成创建主题。

    public class AppTopicProducer {
    
    	private static final String URL = "tcp://192.168.37.128:61616";
    	private static final String QUEUE_NAME = "queue-test";
    	
    	public static void main(String[] args) throws Exception{
    	   
    		// 创建连接工厂
    		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
    		// 建立连接
    		Connection connection = connectionFactory.createConnection();
    		// 启动连接
    		connection.start();
    		// 创建会话
    		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    		// 创建一个目标
    		Destination destination = session.createTopic(QUEUE_NAME);
    		// 创建一个生产者
    		MessageProducer producer = session.createProducer(destination);
    		// 发送消息
    		for(int i=0; i < 100; i++){
    			// 创建消息
    			TextMessage textMessage = session.createTextMessage("test" + i);
    			// 发送消息
    			producer.send(textMessage);
                System.out.println("send" + textMessage.getText());					
    		}
    		// 关闭连接
    		connection.close();
    	
    	}
    }
    

      

    public class AppTopicConsumer {
    
    	private static final String URL = "tcp://192.168.37.128:61616";
    	private static final String QUEUE_NAME = "queue-test";
    	
    	public static void main(String[] args) throws Exception{
    	   
    		// 创建连接工厂
    		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
    		// 建立连接
    		Connection connection = connectionFactory.createConnection();
    		// 启动连接
    		connection.start();
    		// 创建会话
    		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    		// 创建一个目标
    		Destination destination = session.createTopic(QUEUE_NAME);
    		// 创建一个消费者
    		MessageConsumer consumer = session.createConsumer(destination);
    		// 消费消息
    		consumer.setMessageListener(new MessageListener() {
    			
    			@Override
    			public void onMessage(Message arg0) {
    				TextMessage textMessage = (TextMessage)arg0;
    				System.out.println("receive" + textMessage);
    				
    			}
    		});
    	
    	}
    }
    

    之所以 会是 200个消息,但是只消费了 100个, 是因为在 消费者订阅之前, 就已经产生了100个消息,所以这100个不会被消费。

     9  ActiveMQ集群配置

    为什么要对消息中间件集群?

      实现高可用,以排除单点故障引起的服务中断

      实现负载均衡,以提升效率为更多的客户提供服务

    集群方式:

           客户端集群:让多个消费者消费同一队列

        Broker clusters:多个Broker之间同步消息

           Master Slave:实现高可用

    客户端配置

          ActiveMQ 失效转移(failover)

          允许当其中一台消息服务器宕机时,客户端在传输层上重新连接到其他消息服务器。

          语法:failover:(url1,...,urln)?transportOptions

          transportOptions 说明:

                randomize 默认为true, 表示在URL列表中选择URL链接时 是否采用随机策略

                initialReconnectDelay 默认为10,单位毫秒,表示第一次尝试重连之间等待时间

                maxReconnectDelay 默认30000, 单位毫秒,最长重连的时间间隔

    Broker Cluster 集群配置

               原理: 

               节点A的消息 可以同步到 节点B, 节点B的消息也可以同步到节点A上, 节点A产生的消息可以被节点B的消费者消费, 节点B产生的消息可以被节点A的消费者消费。

    Master/Slave集群配置

    集群方案

          Share nothing storage master/slave (已过时,5.8+后移除)

          Shared storage master/slave 共享存储

               Replicated LevelDB Store 基于复制的LevelDB Store   === > 基于zookeeper 的 master 选择方案

               共享存储集群的原理

                

                当前 节点A获得锁资源,成为master , 节点B就无法获得锁资源,一直在等待。如果某个时刻,节点A挂掉,那么节点B会获得锁资源,成为master, 节点A 成为slave

                     

                下面是 基于复制的LevelDB Store 的原理

                

                基于zk, 此时节点A被zk选举为Master,  此时节点A作为与外界沟通的口子,当A接收到新的信息,则会在本地进行存储,并且通过zk 传输到节点B和节点C上进行本地存储。

    两种集群方式对比:

               master/slave 方式 只支持高可用,但是不支持负载均衡。  

               Broker Cluster 支持 负载均衡,但是不支持高可用。

    下面来看一种 即可以高可用,又可以负载均衡的方式。

          我们使用三台服务器的完美集群方案:

    节点B和节点C实现高可用, 节点A为 节点B 或者C 的负载均衡。 但是此方案,一旦节点B 和 节点C 全部挂掉,那么整个系统也就挂掉了,所以我们要使用更多太服务器来防止多台服务器宕机的场景。

  • 相关阅读:
    【转】win7“您可能没有权限使用网络资源”的解决办法
    windows下顽固软件卸载不了的解决方法
    【转】Windows Server 2008修改远程桌面连接数
    winserver2008,运行可执行文件,提示 激活上下文生成失败。 找不到从属程序集 Microsoft.VC90.DebugCRT,processorArchitecture="x86"
    保障视频4G传输的流畅性,海康威视摄像头相关设置
    【转】win7如何设置共享目录,并且访问不需要输入用户名和密码。
    CentOS7.1配置远程桌面
    C++遍历目录,并把目录里超过7天的文件删除(跨平台windows&linux)
    hibernate(二)一级缓存和三种状态解析
    Android进程间的通信之AIDL
  • 原文地址:https://www.cnblogs.com/huxipeng/p/8598310.html
Copyright © 2011-2022 走看看