zoukankan      html  css  js  c++  java
  • ActiveMQ的介绍及使用

    一、消息中间件概述

    什么是消息中间件

    发送者将消息发送给消息服务器,消息服务器将消感存放在若千队列中,在合适的时候再将消息转发给接收者。
    这种模式下,发送和接收是异步的,发送者无需等待; 二者的生命周期未必相同: 发送消息的时候接收者不一定运行,接收消息的时候发送者也不一定运行;一对多通信: 对于一个消息可以有多个接收者。
    

    二、JMS的介绍

    2.1什么是JMS?

    JMS是java的消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。
    

    2.2消息模型

    • P2P (点对点):
      1.P2P模式图

      里面涉及到的概念以及其中的特点
      1.消息队列(Queue)
      2.发送者(Sender)
      3.接收者(Receiver)
      4.每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。
      特点:
      1.每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)
      2.发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列
      3.接收者在成功接收消息之后需向队列应答成功
    • Pub/Sub (发布与订阅)
      Pub/Sub模式图

    里面涉及到的概念以及特点
    1.主题(Topic)
    2.发布者(Publisher)
    3.订阅者(Subscriber)
    客户端将消息发送到主题。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。
    Pub/Sub的特点
    1.每个消息可以有多个消费者
    2.发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。

    • 消息的消费的两个方式
    1. 同步
      订阅者或接收者调用receive方法来接收消息,receive方法在能够接收到消息之前(或超时之前)将一直阻塞。
      2.异步
      订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。

    应用场景:

    用户注册、订单修改库存、日志存储

    ActiveMQ的使用

    window下 ActiveMQ安装

    解压当前activeMQ

    进入bin目录下启动64位系统下的

    启动成功访问页面:

    使用ActiveMQ完成点对点(p2p)通讯模式

    引入pom文件依赖

      <dependencies>
    		<dependency>
    			<groupId>org.apache.activemq</groupId>
    			<artifactId>activemq-core</artifactId>
    			<version>5.7.0</version>
    		</dependency>
    	</dependencies>
    
    

    生产者代码

    	public static void main(String[] args) throws JMSException {
    		//连接工厂JMS 用它创建连接
    		ConnectionFactory collectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
    				ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
    		
    		Connection createConnection = collectionFactory.createConnection();
    		createConnection.start();
    		
    		// Session: 一个发送或接收消息的线程
    		Session session = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
    		// Destination :消息的目的地;消息发送给谁.
    		// 获取session注意参数值xiaobai是Query的名字
    
    		Destination destination = session.createQueue("xiaobai");
    		//MessageProducer:消息生产者
    		MessageProducer producer = session.createProducer(destination);
    		
    		//设置持久化
    		producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    		for (int i = 0; i < 5; i++) {
    			System.out.println("我是生产者" + i);
    			sendMsg(session, producer, "我是生产者" + i);
    			session.commit();
    		}
    		
    		System.out.println("我是生产者发送完毕" );
    	}
    
    	public static void sendMsg(Session session, MessageProducer producer, String i) throws JMSException {
    		TextMessage textMessage = session.createTextMessage("hello activemq" + i);
    		producer.send(textMessage);
    
    	}
    

    消费者代码:

    	//连接工厂,JMS 用它创建连接
    		ConnectionFactory collectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
    				ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
    		
    		Connection createConnection = collectionFactory.createConnection();
    		createConnection.start();
    		//Session: 一个发送或接收消息的线程
    		Session session = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
    		Destination destination = session.createQueue("xiaobai");
    		MessageConsumer createConsumer = session.createConsumer(destination);
    		while (true) {
    			TextMessage textMessage = (TextMessage) createConsumer.receive();
    			if (textMessage != null) {
    				String text = textMessage.getText();
    				System.out.println(text);
    //				textMessage.acknowledge();
    				session.commit();
    			} else {
    				break;
    			}
    			System.out.println("消费者消费完毕");
    		}
    

    启动生产者,在消息中间查看数据

    启动消费者,查看消费情况

    发布订阅

    生产者代码:

    		ConnectionFactory collectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
    				ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
    		Connection createConnection = collectionFactory.createConnection();
    		createConnection.start();
    
    		Session session = createConnection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
    		MessageProducer producer = session.createProducer(null);
    
    		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    		for (int i = 0; i < 5; i++) {
    			System.out.println("我是生产者" + i);
    			sendMsg(session, producer, "我是生产者" + i);
    		}
    		
    		System.out.println("我是生产者发送完毕" );
    	}
    
    	public static void sendMsg(Session session, MessageProducer producer, String i) throws JMSException {
    		TextMessage textMessage = session.createTextMessage("hello activemq" + i);
    		Destination destination = session.createTopic("xiao_topic");
    		producer.send(destination,textMessage);
    
    	}
    

    消费者代码

    		ConnectionFactory collectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
    				ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
    		Connection createConnection = collectionFactory.createConnection();
    		createConnection.start();
    
    		Session session = createConnection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
    		Destination destination = session.createTopic("xiao_topic");
    		MessageConsumer createConsumer = session.createConsumer(destination);
    		while (true) {
    			TextMessage textMessage = (TextMessage) createConsumer.receive();
    			if (textMessage != null) {
    				String text = textMessage.getText();
    				System.out.println(text);
    			} else {
    				break;
    			}
    			System.out.println("消费者消费完毕");
    		}
    		
    	}
    

    先启动的消费者,后启动生产者

  • 相关阅读:
    Pascal's Triangle II
    Pascal's Triangle
    Best Time to Buy and Sell Stock II
    Best Time to Buy and Sell Stock
    Populating Next Right Pointers in Each Node
    path sum II
    Path Sum
    [转载]小波时频图
    [转载]小波时频图
    [转载]Hilbert变换及谱分析
  • 原文地址:https://www.cnblogs.com/Libbo/p/11546816.html
Copyright © 2011-2022 走看看