zoukankan      html  css  js  c++  java
  • 消息中间件

    消息中间件

    什么是消息中间件

    概述

    • 消息中间件可以理解成就是一个服务软件,保存信息的容器,比如生活中的快递云柜.
    • 我们把数据放到消息中间件当中, 然后通知对应的服务进行获取
    • 消息中间件是在消息的传输过程中保存信息的容器

    消息中间件应用场景

    • 使用消息服务器当做大的队列使用, 先进先出, 来处理高并发写入操作
    • 使用消息服务器可以将业务系统的串行执行改为并行执行, 处理效率高, 更合理的榨取服务器的性能.

    同步与异步技术

    同步技术

    • dubbo是一中同步技术, 实时性高, controller调用service项目, 调用就执行,
    • 如果service项目中的代码没有执行完, controller里面的代码一致等待结果.

    异步技术

    • mq消息中间件技术(jms) 是一种异步技术, 消息发送方, 将消息发送给消息服务器,
    • 消息服务器未必立即处理.什么时候去处理, 主要看消息服务器是否繁忙,
    • 消息进入服务器后会进入队列中, 先进先出.实时性不高.

    JMS

    概述:

    • jms的全称叫做Java message service (Java消息服务) jms是jdk底层定义的规范
    • 各大厂商都是实现这个规范的技术

    jms消息服务器同类型技术

    ActiveMQ
    	是apache的一个比较老牌的消息中间件, 它比较均衡, 既不是最安全的, 也不是最快的.
    	
    RabbitMQ
    	是阿里巴巴的一个消息中间件, 更适合金融类业务, 它对数据的安全性比较高.能够保证数据不丢失.
    	
    Kafka
    	Apache下的一个子项目。特点:高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;适合处理海量数据。
    

    JMS中支持的消息类型

    TextMessage
    	一个字符串对象
    	
    MapMessage
    	key-value
    	
    ObjectMessage
    	一个序列化的 Java 对象
    	
    BytesMessage
    	一个字节的数据流
    	
    StreamMessage
    	Java 原始值的数据流
    

    JMS中的两种发送模式

    点对点模式

    • 一个发送方, 一个接收方.
    • 也可以多个发送方, 一个接收方, 主要是接收方必须是第一个.

    订阅发布模式

    • 一个发送方, 多个接收方.
    • 发送方也可以是多个, 主要看接收方, 接收方必须是多个

    ActiveMQ安装

    (1)将apache-activemq-5.12.0-bin.tar.gz 上传至服务器/usr/local目录下
    (2)解压此文件
    	cd /usr/local
    	tar  zxvf  apache-activemq-5.12.0-bin.tar.gz
    (3)为apache-activemq-5.12.0目录赋权
    	chmod 777 apache-activemq-5.12.0
    (4)进入apache-activemq-5.12.0/bin目录赋与执行权限
    	cd apache-activemq-5.12.0/bin
    	chmod 755 activemq 
      (5)    启动
    	 ./activemq start
    

    启动

    在浏览器当中输入http://192.168.1.88:8161/
    进入管理页面
    用户名和密码都是 admin
    
    说明
    	Number Of Pending Messages  :等待消费的消息 这个是当前未出队列的数量。
    	Number Of Consumers  :消费者 这个是消费者端的消费者数量
    	Messages Enqueued  :进入队列的消息  进入队列的总数量,包括出队列的。
    	Messages Dequeued  :出了队列的消息  可以理解为是消费这消费掉的数量。
    

    快速入门

    创建普通Jar工程

    引入pom依赖

    <dependencies>
    	<dependency>
    		<groupId>org.apache.activemq</groupId>
    		<artifactId>activemq-client</artifactId>
    		<version>5.13.4</version>
    	</dependency>
    </dependencies>
    

    点对点模式Queue

    创建QueueProducer

    public static void main(String[] args) throws Exception{
    	//1.创建连接工厂
    	ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.1.88:61616");
    	//2.获取连接
    	Connection connection = connectionFactory.createConnection();
    	//3.启动连接
    	connection.start();
    	//4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
    	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    	//5.创建队列对象, 指定发送的队列名称, 队列名称可以随意起名, 但是发送到哪里, 就要从哪里去接收
    	Queue queue = session.createQueue("test-queue");
    	//6.创建消息生产者
    	MessageProducer producer = session.createProducer(queue);
    	//7.创建消息
    	TextMessage textMessage = session.createTextMessage("Hello ActiveMQ");
    	//8.发送消息
    	producer.send(textMessage);
    	//9.关闭资源
    	producer.close();
    	session.close();
    	connection.close();
    }
    

    创建QueueConsumer

    public static void main(String[] args) throws Exception{
    	//1.创建连接工厂
    	ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.1.88:61616");
    	//2.获取连接
    	Connection connection = connectionFactory.createConnection();
    	//3.启动连接
    	connection.start();
    	//4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
    	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    	//5.创建队列对象
    	Queue queue = session.createQueue("test-queue");
    	//6.创建消息消费
    	MessageConsumer consumer = session.createConsumer(queue);
    	//7.监听消息
    	consumer.setMessageListener(new MessageListener() {
    		public void onMessage(Message message) {
    			TextMessage textMessage=(TextMessage)message;
    			try {
    				System.out.println("接收到消息:"+textMessage.getText());
    			} catch (JMSException e) {
    				e.printStackTrace();
    			}
    		}
    	});
    	//8.等待键盘输入
    	System.in.read();
    	//9.关闭资源
    	consumer.close();
    	session.close();
    	connection.close();
    
    }
    

    订阅发布模式Topic

    TopicConsumer1

    public static void main(String[] args) throws Exception {
    //1.创建连接工厂
    	ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.1.88:61616");
    	//2.获取连接
    	Connection connection = connectionFactory.createConnection();
    	//3.启动连接
    	connection.start();
    	//4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
    	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    	//5.创建主题对象
    	Topic topic = session.createTopic("test-topic");
    	//6.创建消息消费
    	MessageConsumer consumer = session.createConsumer(topic);
    	//7.监听消息
    	consumer.setMessageListener(new MessageListener() {
    		public void onMessage(Message message) {
    			TextMessage textMessage=(TextMessage)message;
    			try {
    				System.out.println("接收到消息:"+textMessage.getText());
    			} catch (JMSException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			}
    		}
    	});
    	//8.等待键盘输入
    	System.in.read();
    	//9.关闭资源
    	consumer.close();
    	session.close();
    	connection.close();
    }
    

    TopicConsumer2

    public static void main(String[] args) throws Exception {
    	//1.创建连接工厂
    	ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.1.88:61616");
    	//2.获取连接
    	Connection connection = connectionFactory.createConnection();
    	//3.启动连接
    	connection.start();
    	//4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
    	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    	//5.创建主题对象
    	Topic topic = session.createTopic("test-topic");
    	//6.创建消息消费
    	MessageConsumer consumer = session.createConsumer(topic);
    
        //7.监听消息
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                TextMessage textMessage=(TextMessage)message;
                try {
                    System.out.println("接收到消息:"+textMessage.getText());
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        //8.等待键盘输入
        System.in.read();
        //9.关闭资源
        consumer.close();
        session.close();
        connection.close();
    
    }
    

    TopicProducer

    public static void main(String[] args) throws Exception {
    	//1.创建连接工厂
    	ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.1.88:61616");
    	//2.获取连接
    	Connection connection = connectionFactory.createConnection();
    	//3.启动连接
    	connection.start();
    	//4.获取session  (参数1:是否启动事务,参数2:消息确认模式)
    	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    	//5.创建主题对象
    	Topic topic = session.createTopic("test-topic");
    	//6.创建消息生产者
    	MessageProducer producer = session.createProducer(topic);
    	//7.创建消息
    	TextMessage textMessage = session.createTextMessage("Hello Topic ActiveMQ");
    	//8.发送消息
    	producer.send(textMessage);
    	//9.关闭资源
    	producer.close();
    	session.close();
    	connection.close();
    }
    

    消息中间件在工程里的应用

    添加中间所需要的依赖在common工程当中

    <!--activeMq -->
    <dependency>
    	<groupId>org.apache.activemq</groupId>
    	<artifactId>activemq-client</artifactId>
    	<version>5.13.4</version>
    </dependency>
    <dependency>
    	<groupId>org.springframework</groupId>
    	<artifactId>spring-jms</artifactId>
    </dependency>
    <dependency>
    	<groupId>javax.jms</groupId>
    	<artifactId>javax.jms-api</artifactId>
    	<version>2.0.1</version>
    </dependency>
    

    在service_sellergoods工程当中添加配置文件

    spring/applicationContext-jms.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
    	http://www.springframework.org/schema/beans/spring-beans.xsd
    	http://www.springframework.org/schema/context   
    	http://www.springframework.org/schema/context/spring-context.xsd">
    
        <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
            <property name="brokerURL" value="tcp://192.168.1.88:61616"/>
        </bean>
    
        <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
            <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
        </bean>  
    
        <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->  
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
            <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
            <property name="connectionFactory" ref="connectionFactory"/>  
        </bean>   
    
        <!-- 发布订阅模式, 商品导入索引库和生成静态页面 -->
        <bean id="topicPageAndSolrDestination" class="org.apache.activemq.command.ActiveMQTopic">
            <!--将商品上架所用的商品id发送到这个队列中-->
            <constructor-arg value="topic_page_solr"/>
        </bean>
    
        <!-- 点对点模式,删除索引库-->  
        <bean id="queueSolrDeleteDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <!--将商品下架所用的商品id发送到这个队列中-->
            <constructor-arg value="queue_solr_delete"/>
        </bean>  
    </beans>
    

    在修改商品状态和删除商品时发送消息

    //注入属性		
    @Autowired
    private JmsTemplate jmsTemplate;
    //为商品上架使用
    @Autowired
    private ActiveMQTopic topicPageAndSolrDestination;
    //为商品下架使用
    @Autowired
    private ActiveMQQueue queueSolrDeleteDestination;
     //商品审核通过		
    @Override
    public void updateStatus(Long[] ids, String status) {
    	if (ids != null) {
    		for (final Long id : ids) {
    			//1. 根据商品id修改商品对象状态码
    			Goods goods  = new Goods();
    			goods.setId(id);
    			goods.setAuditStatus(status);
    			goodsDao.updateByPrimaryKeySelective(goods);
    			//2. 根据商品id修改库存集合对象状态码
    			Item item = new Item();
    			item.setStatus(status);
    			ItemQuery query = new ItemQuery();
    			ItemQuery.Criteria criteria = query.createCriteria();
    			criteria.andGoodsIdEqualTo(id);
    			itemDao.updateByExampleSelective(item, query);
    			/**
    			 * 将商品id作为消息发送给消息服务器
    			 */
    			if ("2".equals(status)) {
    				jmsTemplate.send(topicPageAndSolrDestination, new MessageCreator() {
    					@Override
    					public Message createMessage(Session session) throws JMSException {
    						TextMessage textMessage = session.createTextMessage(String.valueOf(id));
    						return textMessage;
    					}
    				});
    			}
    		}
    	}
    }
    
    //删除商品时
    public void delete(Long[] ids) {
    	if (ids != null) {
    		for (final Long id : ids) {
    			// 1. 到数据库中对商品进行逻辑删除
    			Goods goods = new Goods();
    			goods.setId(id);
    			goods.setIsDelete("1");
    			goodsDao.updateByPrimaryKeySelective(goods);
    			//2 将商品id作为消息发送给消息服务器
    			jmsTemplate.send(queueSolrDeleteDestination, new MessageCreator() {
    				@Override
    				public Message createMessage(Session session) throws JMSException {
    					TextMessage textMessage = session.createTextMessage(String.valueOf(id));
    					return textMessage;
    				}
    			});
    
    		}
    	}
    }
    

    在service_page工程当中监听消息生成静态页面

    添加配置文件

    spring/applicationContext-jms-consumer.xml
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
    	http://www.springframework.org/schema/beans/spring-beans.xsd
    	http://www.springframework.org/schema/context   
    	http://www.springframework.org/schema/context/spring-context.xsd">
    
        <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
            <property name="brokerURL" value="tcp://192.168.1.88:61616"/>
        </bean>
    
        <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
            <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
        </bean>  
    
        <!--发布订阅模式, 生成页面-->  
        <bean id="topicPageAndSolrDestination" class="org.apache.activemq.command.ActiveMQTopic">
            <!--指定从这个队列中获取上架的商品id-->
            <constructor-arg value="topic_page_solr"/>
        </bean>    
    
        <!-- 发布订阅模式, 消息监听容器   生成页面 -->
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="destination" ref="topicPageAndSolrDestination" />
            <property name="messageListener" ref="pageListener" />
        </bean>
        <bean id="pageListener" class="com.itxk.core.listener.PageListener"></bean>
    
    </beans>
    

    编写监听器

    public class PageListener implements MessageListener {
        @Autowired
        private CmsService cmsService;
        @Override
        public void onMessage(Message message) {
            ActiveMQTextMessage atm = (ActiveMQTextMessage)message;
            try {
                String goodsId = atm.getText();
                Map<String, Object> goodsData = cmsService.findGoodsData(Long.parseLong(goodsId));
                cmsService.createStaticPage(Long.parseLong(goodsId), goodsData);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    在service_search工程当中监听消息添加和删除solr商品信息

    添加配置文件

    spring/applicationContext-jms-consumer.xml
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
    	http://www.springframework.org/schema/beans/spring-beans.xsd
    	http://www.springframework.org/schema/context   
    	http://www.springframework.org/schema/context/spring-context.xsd">
    
        <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
            <property name="brokerURL" value="tcp://192.168.1.88:61616"/>
        </bean>
    
        <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
            <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
        </bean>  
    
        <!--发布订阅模式, 将数据导入solr索引库-->  
        <bean id="topicPageAndSolrDestination" class="org.apache.activemq.command.ActiveMQTopic">
            <!-- 指定从哪个队列中去接收上架业务的商品id -->
            <constructor-arg value="topic_page_solr"/>
        </bean>    
    
        <!-- 发布订阅模式, 消息监听容器, 将数据导入solr索引库 -->
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="destination" ref="topicPageAndSolrDestination" />
            <property name="messageListener" ref="pageAndSolrListener" />
        </bean>
        <bean id="pageAndSolrListener" class="com.itxk.core.listener.ItemSearchListener"></bean>
    
        <!-- 点对点模式,删除索引库-->
        <bean id="queueSolrDeleteDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <!--指定从这个队列中去接收下架的商品id-->
            <constructor-arg value="queue_solr_delete"/>
        </bean>
    
        <!-- 点对点模式, 消息监听容器  删除索引库-->
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="destination" ref="queueSolrDeleteDestination" />
            <property name="messageListener" ref="itemDeleteListener" />
        </bean>
        <bean id="itemDeleteListener" class="com.itxk.core.listener.ItemDeleteListener"></bean>
    
    </beans>
    

    添加监听器

    ItemSearchListener
    public class ItemSearchListener implements MessageListener {
        @Autowired
        private SolrManagerService solrManagerService;
        @Autowired
        private ItemDao itemDao;
        @Override
        public void onMessage(Message message) {
            //为了方便获取文本消息, 将原生的消息对象转换成activeMq的文本消息对象
            ActiveMQTextMessage atm = (ActiveMQTextMessage)message;
            try {
                String goodsId = atm.getText();
                ItemQuery query = new ItemQuery();
                ItemQuery.Criteria criteria = query.createCriteria();
                //查询指定商品的库存数据
                criteria.andGoodsIdEqualTo(Long.parseLong(goodsId));
                List<Item> items = itemDao.selectByExample(query);
                if (items != null) {
                    for (Item item : items) {
                        //获取规格json格式字符串
                        String specJsonStr = item.getSpec();
                        Map map = JSON.parseObject(specJsonStr, Map.class);
                        item.setSpecMap(map);
                    }
                }
                solrManagerService.saveItemToSolr(items);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    ItemDeleteListener
    public class ItemDeleteListener implements MessageListener {
        @Autowired
        private SolrManagerService solrManagerService;
        @Override
        public void onMessage(Message message) {
            ActiveMQTextMessage atm = (ActiveMQTextMessage)message;
            try {
                String goodsId = atm.getText();
                ArrayList<Object> itemGoodsID = new ArrayList<>();
                itemGoodsID.add(Long.parseLong(goodsId));
                solrManagerService.deleteItemByGoodsId(itemGoodsID);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
  • 相关阅读:
    Djang下载虚拟环境设置
    数据类型和变量
    zedboard如何从PL端控制DDR读写(三)——AXI-FULL总线调试
    zedboard如何从PL端控制DDR读写(二)——AXI总线
    zedboard如何从PL端控制DDR读写(一)
    用python3统计代码行数
    内存电路分析-设计框图和金手指
    钢铁侠也要换成女版的了???
    【转】DDR3详解(以Micron MT41J128M8 1Gb DDR3 SDRAM为例)
    verilog中符号位的扩展问题
  • 原文地址:https://www.cnblogs.com/mumuyinxin/p/11709217.html
Copyright © 2011-2022 走看看