zoukankan      html  css  js  c++  java
  • ActiveMQ 入门

    一 ActiveMQ简介

      ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

    二 ActiveMQ特性

    ⒈ 多种语言和协议编写客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
    ⒉ 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
    ⒊ 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
    ⒋ 通过了常见J2EE服务器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
    ⒌ 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
    ⒍ 支持通过JDBC和journal提供高速的消息持久化
    ⒎ 从设计上保证了高性能的集群,客户端-服务器,点对点
    ⒏ 支持Ajax
    ⒐ 支持与Axis的整合
    ⒑ 可以很容易的调用内嵌JMS provider,进行测试

     三 ActiveMQ安装

    ActiveMQ在windows服务上安装操作如下:

        1.在官网下载activemq安装文件。地址:http://activemq.apache.org/download.html

        2.下载的tar.gz安装文件到windows服务器上,并解压到指定目录

        3.运行activemq,进入到解压的 apache-activemq-5.15.2/,根据你的电脑的位数,进入win32或者win64,执行activemq.bat

        4.开放端口8161,61616,保证端口可访问。

       运行activemq截图如下:

     

     本机访问启动成功的activemq截图如下:

    四 ActivemQ 开发

    Point-to-Point (点对点)消息模式

    1)原理图

    2)代码 下面的样例代码是在SpringBoot项目中的,不明白的同学可以先去看看SpringBoot

    message生产者

    package com.bTest.demo;
    
    import java.io.Serializable;
    import java.util.Date;
    
    import javax.jms.Connection;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.MessageProducer;
    import javax.jms.Queue;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import redis.clients.jedis.Jedis;
    
    @RestController
    @SpringBootApplication
    @EnableAutoConfiguration
    public class Demo4Application {
    
    	public static void main(String[] args) {
    		SpringApplication.run(Demo4Application.class, args);
    	}
    
    	@RequestMapping("/hello")
    	public String sysoHello() {
    		return new Date().toString();
    	}
    
    	@RequestMapping("/redis")
    	public String redis() {
    		Jedis jedis = new Jedis("192.168.1.100");
    		jedis.auth("cy");
    		System.out.println("连接成功");
    		// 查看服务是否运行
    		System.out.println("服务正在运行: " + jedis.ping());
    		return "服务正在运行: " + jedis.ping();
    	}
    
    	// activeMQ
    	@RequestMapping("/sendMq")
    	public void sendMq() throws JMSException {
    		ActiveMQConnectionFactory mqConnection = new ActiveMQConnectionFactory("admin1", "admin", "tcp://127.0.0.1:61616");
    		Connection connection = mqConnection.createConnection();
    		connection.start();
    		Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
    		Destination mqRequest = session.createQueue("request-queue");
    		Destination mqResponse = session.createQueue("response-queue");
    		MessageProducer producer = session.createProducer(mqRequest);
    		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    		MessageConsumer consumer = session.createConsumer(mqResponse);
    		TestMqBean bean = new TestMqBean();
    		bean.setName("小姑娘");
    		for (int i = 0; i < 10; i++) {
    			//bean.setAge("18"+i);
    			//producer.send(session.createObjectMessage(bean));
    			producer.send(session.createTextMessage("第"+i+"次 :这只是个MQ的测试!"));
    		}
    		producer.close();
    		System.out.println("MQ消息发送成功!");
    		consumer.setMessageListener(new MessageListener() {
    			
    			@Override
    			public void onMessage(Message message) {
    				// TODO Auto-generated method stub
    				if(null != message){
    					TextMessage textMsg = (TextMessage) message;
    					try {
    						System.out.println("收到反馈消息:"+textMsg.getText());
    					} catch (JMSException e) {
    						// TODO Auto-generated catch block
    						e.printStackTrace();
    					}
    				}
    			}
    			
    		});
    	}
    }
    

     message消费者

    package com.bTest.demo;
    
    import java.util.Date;
    
    import javax.jms.Connection;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.MessageProducer;
    import javax.jms.ObjectMessage;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    @RestController
    @SpringBootApplication
    @EnableAutoConfiguration
    public class Demo3Application {
    
    	public static void main(String[] args) {
    		SpringApplication.run(Demo3Application.class, args);
    	}
    	@RequestMapping("/hello")
    	public String sysoHello(){
    		return new Date().toString();
    	}
    	//接收消息
    	@RequestMapping("/receiveMq")
    	public void receiveMqMessage() throws JMSException{
    		ActiveMQConnectionFactory mqConnection = new ActiveMQConnectionFactory("admin1", "admin", "tcp://127.0.0.1:61616");
    		Connection connection = mqConnection.createConnection();
    		connection.start();
    		Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
    		Destination mqRequest = session.createQueue("request-queue");
    		Destination meResponse = session.createQueue("response-queue");
    		MessageProducer producer = session.createProducer(meResponse);
    		MessageConsumer consumer = session.createConsumer(mqRequest);
    		
    		producer= session.createProducer(meResponse);
    		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    		consumer.setMessageListener(new MessageListener() {
    			
    			@Override
    			public void onMessage(Message message) {
    				// TODO Auto-generated method stub
    			TextMessage textMsg = (TextMessage) message;
    			try {
    				System.out.println("接收到的消息是:"+textMsg.getText());
    			} catch (JMSException e) {
    				// TODO Auto-generated catch block
    				e.printStackTrace();
    			}
    			}
    		});
    		producer.send(session.createTextMessage("成功接收到MQ消息!"));
    	}
    }
    

      

    Publisher/Subscriber(发布/订阅者)消息模式开发流程 

    1)原理图

    2)代码   稍后会持续更新。。。

  • 相关阅读:
    Deepin Linux下安装安卓应用的各种方式
    win下的终端使用指南
    IDEA自定义TODO
    WSL的ssh-agent问题
    MySQL数据类型
    MySQL常用命令.md
    Period 时间坑
    exp/imp管理
    expdp和impdp管理(逻辑导入导出)
    同义词
  • 原文地址:https://www.cnblogs.com/freud/p/8295061.html
Copyright © 2011-2022 走看看