ActiveMQ
开源消息总线
MQ:message queue=消息队列
JMS
java message service=java消息服务(不同系统信息交换)
队列(queue)
功能:就是对消息进行排队,消息生产者和消费者形成一对一关系
queue信息发送与接收
流程:创建连接工厂-->创建连接-->创建会话-->通过会话创建队列-->将队列放进创建的生产者-->创建消息-->生产者发送消息
信息发送
package com.test.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; public class QueueSender { //携程发送消息查询✈票 public static void send(){ //东航的服务器 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); //连接 try { Connection con = factory.createConnection(); con.start(); //会话 //AUTO_ACKNOWLEDGE创建会话的时候自动确认连接正确 Session session = con.createSession(false,Session.AUTO_ACKNOWLEDGE); //创建队列---------给队列起名first Queue queue = session.createQueue("first"); //消息生产者 MessageProducer producer = session.createProducer(queue); //消息 TextMessage message = session.createTextMessage("第5条message:hello word"); //发送 producer.send(message); session.close(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { send(); } }
信息接收
流程:创建连接工厂-->创建连接-->创建会话-->通过会话创建队列-->将队列放进创建的消费者-->监听队列->获取发送来的消息
package com.test.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * Created by MY on 2017/8/2. */ public class QueueConsumer { //东航读取消息 public static void recevice(){ ActiveMQConnectionFactory factory= new ActiveMQConnectionFactory("tcp://localhost:61616"); try { Connection con=factory.createConnection(); con.start(); //会话 Session session=con.createSession(false, Session.AUTO_ACKNOWLEDGE); //队列 Queue queue=session.createQueue("first"); MessageConsumer meg=session.createConsumer(queue); //监听队列 meg.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { TextMessage msg=(TextMessage)message; System.out.println(msg.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { recevice(); } }
Topic(主题):
功能:广播,一对多
package com.test.topic; import com.test.entity.Book; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * Created by MY on 2017/8/2. */ public class TopicSender { public static void send(){ ActiveMQConnectionFactory factory=new ActiveMQConnectionFactory("tcp://localhost:61616"); try { Connection con=factory.createConnection(); Session session=con.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic=session.createTopic("topic-first"); MessageProducer producer=session.createProducer(topic); Book bk=new Book(); bk.setTitle("红楼梦"); ObjectMessage om= session.createObjectMessage(bk); producer.send(om); session.close(); } catch (JMSException e) { e.printStackTrace(); } } public static void main(String[] args) { send(); } } package com.test.topic; import com.test.entity.Book; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * Created by MY on 2017/8/2. */ public class TopicConsumer { //收听 public static void listen(){ ActiveMQConnectionFactory factory=new ActiveMQConnectionFactory("tcp://localhost:61616"); factory.setTrustAllPackages(true); try { Connection con= factory.createConnection(); con.start(); Session session= con.createSession(false,Session.AUTO_ACKNOWLEDGE); //主题 Topic topic=session.createTopic("topic-first"); //消费者 MessageConsumer consumer=session.createConsumer(topic); //监听 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { ObjectMessage msg=(ObjectMessage)message; try { Book bk=(Book)msg.getObject(); System.out.println("1"+bk.getTitle()); } catch (JMSException e) { e.printStackTrace(); } } }); } catch (JMSException e) { e.printStackTrace(); } } public static void main(String[] args) { listen(); } } package com.test.topic; import com.test.entity.Book; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * Created by MY on 2017/8/2. */ public class TopicConsumer2 { //收听 public static void listen2(){ ActiveMQConnectionFactory factory=new ActiveMQConnectionFactory("tcp://localhost:61616"); factory.setTrustAllPackages(true); try { Connection con= factory.createConnection(); con.start(); Session session= con.createSession(false,Session.AUTO_ACKNOWLEDGE); //主题 Topic topic=session.createTopic("topic-first"); //消费者 MessageConsumer consumer=session.createConsumer(topic); //监听 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { ObjectMessage msg=(ObjectMessage)message; try { Book bk=(Book)msg.getObject(); System.out.println("2:"+bk.getTitle()); } catch (JMSException e) { e.printStackTrace(); } } }); } catch (JMSException e) { e.printStackTrace(); } } public static void main(String[] args) { listen2(); } }