zoukankan      html  css  js  c++  java
  • ActiveMQ的介绍及使用

    一、消息中间件概述

    什么是消息中间件

    发送者将消息发送给消息服务器,消息服务器将消感存放在若千队列中,在合适的时候再将消息转发给接收者。
    这种模式下,发送和接收是异步的,发送者无需等待; 二者的生命周期未必相同: 发送消息的时候接收者不一定运行,接收消息的时候发送者也不一定运行;一对多通信: 对于一个消息可以有多个接收者。
    

    二、JMS的介绍

    2.1什么是JMS?

    JMS是java的消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。
    

    2.2消息模型

    • P2P (点对点):
      1.P2P模式图

      里面涉及到的概念以及其中的特点
      1.消息队列(Queue)
      2.发送者(Sender)
      3.接收者(Receiver)
      4.每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。
      特点:
      1.每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)
      2.发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列
      3.接收者在成功接收消息之后需向队列应答成功
    • Pub/Sub (发布与订阅)
      Pub/Sub模式图

    里面涉及到的概念以及特点
    1.主题(Topic)
    2.发布者(Publisher)
    3.订阅者(Subscriber)
    客户端将消息发送到主题。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。
    Pub/Sub的特点
    1.每个消息可以有多个消费者
    2.发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。

    • 消息的消费的两个方式
    1. 同步
      订阅者或接收者调用receive方法来接收消息,receive方法在能够接收到消息之前(或超时之前)将一直阻塞。
      2.异步
      订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。

    应用场景:

    用户注册、订单修改库存、日志存储

    ActiveMQ的使用

    window下 ActiveMQ安装

    解压当前activeMQ

    进入bin目录下启动64位系统下的

    启动成功访问页面:

    使用ActiveMQ完成点对点(p2p)通讯模式

    引入pom文件依赖

      <dependencies>
    		<dependency>
    			<groupId>org.apache.activemq</groupId>
    			<artifactId>activemq-core</artifactId>
    			<version>5.7.0</version>
    		</dependency>
    	</dependencies>
    
    

    生产者代码

    	public static void main(String[] args) throws JMSException {
    		//连接工厂JMS 用它创建连接
    		ConnectionFactory collectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
    				ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
    		
    		Connection createConnection = collectionFactory.createConnection();
    		createConnection.start();
    		
    		// Session: 一个发送或接收消息的线程
    		Session session = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
    		// Destination :消息的目的地;消息发送给谁.
    		// 获取session注意参数值xiaobai是Query的名字
    
    		Destination destination = session.createQueue("xiaobai");
    		//MessageProducer:消息生产者
    		MessageProducer producer = session.createProducer(destination);
    		
    		//设置持久化
    		producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    		for (int i = 0; i < 5; i++) {
    			System.out.println("我是生产者" + i);
    			sendMsg(session, producer, "我是生产者" + i);
    			session.commit();
    		}
    		
    		System.out.println("我是生产者发送完毕" );
    	}
    
    	public static void sendMsg(Session session, MessageProducer producer, String i) throws JMSException {
    		TextMessage textMessage = session.createTextMessage("hello activemq" + i);
    		producer.send(textMessage);
    
    	}
    

    消费者代码:

    	//连接工厂,JMS 用它创建连接
    		ConnectionFactory collectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
    				ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
    		
    		Connection createConnection = collectionFactory.createConnection();
    		createConnection.start();
    		//Session: 一个发送或接收消息的线程
    		Session session = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
    		Destination destination = session.createQueue("xiaobai");
    		MessageConsumer createConsumer = session.createConsumer(destination);
    		while (true) {
    			TextMessage textMessage = (TextMessage) createConsumer.receive();
    			if (textMessage != null) {
    				String text = textMessage.getText();
    				System.out.println(text);
    //				textMessage.acknowledge();
    				session.commit();
    			} else {
    				break;
    			}
    			System.out.println("消费者消费完毕");
    		}
    

    启动生产者,在消息中间查看数据

    启动消费者,查看消费情况

    发布订阅

    生产者代码:

    		ConnectionFactory collectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
    				ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
    		Connection createConnection = collectionFactory.createConnection();
    		createConnection.start();
    
    		Session session = createConnection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
    		MessageProducer producer = session.createProducer(null);
    
    		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    		for (int i = 0; i < 5; i++) {
    			System.out.println("我是生产者" + i);
    			sendMsg(session, producer, "我是生产者" + i);
    		}
    		
    		System.out.println("我是生产者发送完毕" );
    	}
    
    	public static void sendMsg(Session session, MessageProducer producer, String i) throws JMSException {
    		TextMessage textMessage = session.createTextMessage("hello activemq" + i);
    		Destination destination = session.createTopic("xiao_topic");
    		producer.send(destination,textMessage);
    
    	}
    

    消费者代码

    		ConnectionFactory collectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
    				ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
    		Connection createConnection = collectionFactory.createConnection();
    		createConnection.start();
    
    		Session session = createConnection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
    		Destination destination = session.createTopic("xiao_topic");
    		MessageConsumer createConsumer = session.createConsumer(destination);
    		while (true) {
    			TextMessage textMessage = (TextMessage) createConsumer.receive();
    			if (textMessage != null) {
    				String text = textMessage.getText();
    				System.out.println(text);
    			} else {
    				break;
    			}
    			System.out.println("消费者消费完毕");
    		}
    		
    	}
    

    先启动的消费者,后启动生产者

  • 相关阅读:
    假期进度报告2
    假期进度报告1
    JavaScript下判断元素是否存在
    浪潮之巅阅读笔记06
    浪潮之巅阅读笔记05
    浪潮之巅阅读笔记04
    【C语言】C语言简介
    iOS网络监测方法
    iOS常用手势识别器
    【CoreData】 简单地使用
  • 原文地址:https://www.cnblogs.com/Libbo/p/11546816.html
Copyright © 2011-2022 走看看