zoukankan      html  css  js  c++  java
  • ActiveMQ P2P版的HelloWorld

    1.2 JMS应用程序接口

    ConnectionFactory:

    用户用来创建到JMS提供者的连接的被管对象。JMS客户通过可移植的接口访问连接,这样当下层的实现改变时,代码不需要进行修改。 管理员

    在JNDI名字空间中配置连接工厂,这样,JMS客户才能够查找到它们。根据消息类型的不同,用户将使用队列连接工厂,或者主题连接工厂。

    Connection:

    连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂后,就可以创建一个与JMS提供者的连接。根据不同的连接类型,连接允

    许用户创建会话,以发送和接收队列和主题到目标。

    Session:

    表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。会

    话的好处是它支持事务。如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以

    使用回滚操作取消这些消息。一个会话允许用户创建消息生产者来发送消息,创建消息消费者来接收消息。

    Destination:

    目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。JMS管理员创建这些对象,然

    后用户通过JNDI发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的队列,以及发布者/订阅者模型的主题。

    MessageConsumer:

    由会话创建的对象,用于接收发送到目标的消息。消费者可以同步地(阻塞模式),或异步(非阻塞)接收队列和主题类型的消息。

    MessageProducer:

    由会话创建的对象,用于发送消息到目标。用户可以创建某个目标的发送者,也可以创建一个通用的发送者,在发送消息时指定目标。

    Message:

    是在消费者和生产者之间传送的对象,也就是说从一个应用程序创送到另一个应用程序。一个消息有三个主要部分:

    消息头(必须):包含用于识别和为消息寻找路由的操作设置。

    一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。

    一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。


    消息接口非常灵活,并提供了许多方式来定制消息的内容。

    2.Hello World

    2.0 基本配置

    使用Maven,pom.xml

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>org.ygy</groupId>
        <artifactId>activemq</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <packaging>jar</packaging>
    
        <name>activemq</name>
        <url>http://maven.apache.org</url>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.10</version>
                <scope>test</scope>
            </dependency>
    
            <!-- activemq,学习中 -->
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-core</artifactId>
                <version>5.7.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.5.6</version>
            </dependency>
    
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.5.6</version>
            </dependency>
    
    
        </dependencies>
    </project>
     

    这里只要引入ActiveMQ的依赖就可以了。

    2.1 P2P版的HelloWorld

    生产者:HelloQueueProducer

    package org.ygy.mq.lesson01;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /**
     * 最简单的生产者
     * 
     * @author yuguiyang
     * 
     */
    public class HelloQueueProducer {
    	public static void main(String[] args) {
    		// 生产者的主要流程
    		Connection connection = null;
    
    		try {
    			// 1.初始化connection工厂,使用默认的URL
    			//failover://tcp://localhost:61616
    			ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
    
    			// 2.创建Connection
    			connection = connectionFactory.createConnection();
    
    			// 3.打开连接
    			connection.start();
    
    			// 4.创建Session,(是否支持事务)
    			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
    			// 5.创建消息目标
    			Destination destination = session.createQueue("queue_lesson");
    
    			//6.创建生产者
    			MessageProducer producer = session.createProducer(destination);
    			
    			//7.配置消息是否持久化
    			/*	DeliverMode有2种方式:
    			 * 
    			 	public interface DeliveryMode {
    				    static final int NON_PERSISTENT = 1;//不持久化:服务器重启之后,消息销毁
    				
    				    static final int PERSISTENT = 2;//持久化:服务器重启之后,该消息仍存在
    				}
    			 */
    			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    			
    			//8.初始化要发送的消息
    			TextMessage message = session.createTextMessage("Hello World ! from yuguiyang");
    			
    			//9.发送消息
    			producer.send(message);
    
    		} catch (JMSException e) {
    			e.printStackTrace();
    		} finally{
    			try {
    				//10.关闭连接
    				connection.close();
    			} catch (JMSException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    }
    

      

     

    消费者:HelloQueueConsumer

    package org.ygy.mq.lesson01;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    /**
     * 简单的消费者
     * 
     * @author yuguiyang
     * 
     */
    public class HelloQueueConsumer implements MessageListener {
    
    	@Override
    	public void onMessage(Message message) {
    		//如果消息是TextMessage
    		if (message instanceof TextMessage) {
    			//强制转换一下
    			TextMessage txtMsg = (TextMessage) message;
    			try {
    				//输出接收到的消息
    				System.out.println("HaHa: I'v got " + txtMsg.getText());
    			} catch (JMSException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    
    	public void receive() {
    		// 消费者的主要流程
    		Connection connection = null;
    
    		try {
    			// 1.初始化connection工厂
    			ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
    
    			// 2.创建Connection
    			connection = connectionFactory.createConnection();
    
    			// 3.打开连接
    			connection.start();
    
    			// 4.创建session
    			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
    			// 5.创建消息目标
    			Destination destination = session.createQueue("queue_lesson");
    
    			//6.创建消费者
    			MessageConsumer consumer = session.createConsumer(destination);
    			
    			//7.配置监听
    			consumer.setMessageListener(this);
    		} catch (JMSException e) {
    			e.printStackTrace();
    		}
    	}
    
    	public static void main(String[] args) {
    		new HelloQueueConsumer().receive();
    	}
    
    }
    

      

     

    3.测试

    代码写好了,我们测试一下

    3.1 启动ActiveMQ服务器

    上一篇博客中,说过,进入到bin目录下,双击 activemq.bat,启动

    启动后,访问 http://localhost:8161/admin/

    可能会让你输入用户名和密码 ,这里默认的用户名:admin;密码:admin

    然后,我们单击那个 Queues菜单:

    这里默认应该什么都没有,有的话,也没事

    3.2运行程序

    我们先运行生产者,运行完之后,刷新一下,上面的界面:

    可以看到上面的记录

    这里显示的是服务器上的队列,

    Name:就是队列的名字啦,其中 queue_lesson就是我们程序中新建队列

    Number Of Pending Messages:是等待消费的消息,因为我们只运行了生产者,而且只产生了一条消息,因此队列中有一条未消费的消息。

    Number Of Consumers:当前运行着的消费者,我们还没有

    Messages Enqueued :进入队列的消息,我们只产生了一次,也只有一条消息

    Message Dequeued:出了队列的消息,指被消费的消息

    Views:查看当前队列的一些信息

    Operations:对当前队列的一些操作

    在这里,我们单击Browse连接:

    在这里,我们能看到当前队列中的消息

    Message ID:应该是自动生成的,还不了解

    Correlation ID:这个以后再研究,他主要是用来关联多个Message,例如需要回复一个消息时,通常把回复的消息的JMSCorrelationID设置为原来消息的ID

    Persistence:是否持久化,我们在代码里,没有设置持久化

    Priority:权重,默认应该为4

    Redelivered:消息是否被重发

    Reply To:回复,以后会说到

    TimeStamp:消息的时间戳

    Type:消息类型

    Operations:操作

    下面,我们点击Message ID 连接,进入到消息的详细界面:

    这里,可以看到,消息的内容和消息头信息

    好了,到这里,我们就可以运行消费者了,先回到最开始的界面:

    运行消费者,之后,控制台输出:

    我们接受到了消息。

    刷新界面:

    可以看到,这里的内容变了,

    因为消息被我们消费了,所以被消费消息加1,而且,当前消费者还在运行,所以有一个消费者。

  • 相关阅读:
    C#类头部声明样式
    VisualStudio使用技巧及快捷键
    #使用ListView更新数据出现闪烁解决办法
    获取公网IP地址
    JArray数组每个JObject对象添加一个键值对
    部署网站出现System.ServiceModel.Activation.HttpModule错误
    MYSQL存储引擎的比较
    数据库索引原理(转载)
    皮尔逊相关系数
    进程与线程
  • 原文地址:https://www.cnblogs.com/wangshouchang/p/8474018.html
Copyright © 2011-2022 走看看