zoukankan      html  css  js  c++  java
  • hadoop15---activemq

    java JMS技术
    JMS是规范,activeMQ是实现。
    
    用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
    
    它类似于JDBC,JDBC 是可以用来访问许多不同关系数据库的 API。IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ。
    
    JMS 使您能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个 JMS客户机发送消息。
    消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。
    
    根据有效负载的类型来划分,可以将消息分为几种类型,它们分别携带:简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。

    一个主题就是一个队列。

    JMS规范
    体系架构
    JMS由以下元素组成。
    JMS提供者provider:连接面向消息中间件的,JMS接口的一个实现。提供者可以是Java平台的JMS实现,也可以是非Java平台的面向消息中间件的适配器。
    JMS客户:生产者消费者。
    JMS生产者:创建并发送消息的JMS客户。
    JMS消费者:接收消息的JMS客户。
    JMS消息:包括可以在JMS客户之间传递的数据的对象
    JMS队列:一个容纳那些被发送的等待阅读的消息的区域。与队列名字所暗示的意思不同,消息的接受顺序并不一定要与消息的发送顺序相同。一旦一个消息被阅读,该消息将被从队列中移走。
    JMS主题:一种支持发送消息给多个订阅者的机制。

    Java消息服务应用程序结构支持两种模型

    1、 点对点或叫做队列模型

    在点对点或队列模型下,一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到消费者的队列。

    2、发布者/订阅者模型

    发布者/订阅者模型支持向一个特定的消息主题发布消息。0或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方。这种模式好比是匿名公告板。

    一个主题就是一个队列。

    这种模式被概括为:

    多个消费者可以获得消息

    在发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅(subscription),以便客户能够订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者建立了持久的订阅。在那种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布。

    代码演示
    1.下载ActiveMQ
    去官方网站下载:http://activemq.apache.org/安装。
    有web管理页面。
    消息服务是跨语言的。
    
    2.运行ActiveMQ
    解压缩apache-activemq-5.5.1-bin.zip,
    修改配置文件activeMQ.xml,将0.0.0.0修改为localhost。
    下面是不同的协议:
    <transportConnectors>
           <transportConnector name="openwire" uri="tcp://localhost:61616"/>
           <transportConnector name="ssl"     uri="ssl://localhost:61617"/>
           <transportConnector name="stomp"   uri="stomp://localhost:61613"/>
          <transportConnector uri="http://localhost:8081"/>
           <transportConnector uri="udp://localhost:61618"/>
    然后双击apache-activemq-5.5.1inactivemq.bat运行ActiveMQ程序。
    启动ActiveMQ以后,启动了一个jetty服务器,登陆:http://localhost:8161/admin/,创建一个Queue,命名为FirstQueue。
    
    activeMQ对消息记录是没有做持久化的,先开启消费者再开启生产者。kafka是做了持久化的,可以设置清除时间。activeMQ对于小的消息可以,对于大数据还是要用kafka,kafka有集群。
    
    Kafka本质没有完全实现JMS。Kafka消息是不保证顺序的。Kafka官网也说不是一个消息队列,只是一个消息缓存。Kafka是一个缓冲池。
    常用的JMS实现
    开源的提供者包括:
    Apache ActiveMQ、JBoss 社区所研发的 HornetQ、Joram、Coridan的MantaRay、The OpenJMS Group的OpenJMS
    专有的提供者包括:
    BEA的BEA WebLogic Server JMS、TIBCO Software的EMS、GigaSpaces Technologies的GigaSpaces、Softwired 2006的iBus
    IONA Technologies的IONA JMS、SeeBeyond的IQManager(2005年8月被Sun Microsystems并购)、webMethods的JMS+ -my-channels的Nirvana、Sonic Software的SonicMQ、SwiftMQ的SwiftMQ、IBM的WebSphere MQ

    生产者:

    package cn.itcast_03_mq.topic;
    import java.util.Random;
    
    import javax.jms.JMSException;      
    
    public class ProducerTest {      
         
        /**    
         * @param args    
         */     
        public static void main(String[] args) throws JMSException, Exception {      
            ProducerTool producer = new ProducerTool(); 
            Random random = new Random();
            for(int i=0;i<2;i++){
                
                Thread.sleep(random.nextInt(1)*1000);
                
                producer.produceMessage("Hello, world333!--"+i);      
                producer.close();  //刷新管理页面可以看到
            }
            
        }      
    }      
    
    
    
    package cn.itcast_03_mq.topic;
    import javax.jms.Connection;      
    import javax.jms.DeliveryMode;      
    import javax.jms.Destination;      
    import javax.jms.JMSException;      
    import javax.jms.MessageProducer;      
    import javax.jms.Session;      
    import javax.jms.TextMessage;      
         
    import org.apache.activemq.ActiveMQConnection;      
    import org.apache.activemq.ActiveMQConnectionFactory;      
         
    public class ProducerTool {        
        private String user = ActiveMQConnection.DEFAULT_USER;         
        private String password = ActiveMQConnection.DEFAULT_PASSWORD;       
        private String url = "tcp://192.168.88.128:61616";//ActiveMQConnection.DEFAULT_BROKER_URL;       
        private String subject = "eee";      
        private Destination destination = null;      
        private Connection connection = null;      
        private Session session = null;      
        private MessageProducer producer = null;
        // 初始化      
        private void initialize() throws JMSException, Exception {      
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(      
                    user, password, url);      
            connection = connectionFactory.createConnection();      
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);      
            destination = session.createTopic(subject);   //主题   
            producer = session.createProducer(destination);      
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //发送模式     
        }
        // 发送消息      
        public void produceMessage(String message) throws JMSException, Exception {      
            initialize();      
            TextMessage msg = session.createTextMessage(message);      
            connection.start();      
            System.out.println("Producer:->Sending message: " + message);      
            producer.send(msg);      
            System.out.println("Producer:->Message sent complete!");      
        }
        // 关闭连接      
        public void close() throws JMSException {      
            System.out.println("Producer:->Closing connection");      
            if (producer != null)      
                producer.close();      
            if (session != null)      
                session.close();      
            if (connection != null)      
                connection.close();      
        }      
    }        

    消费者:

    package cn.itcast_03_mq.topic;
    
    import javax.jms.JMSException;
    
    public class ConsumerTest implements Runnable {
        static Thread t1 = null;
    
        /**
         * @param args
         * @throws InterruptedException
         * @throws InterruptedException
         * @throws JMSException
         * @throws InterruptedException
         */
        public static void main(String[] args) throws InterruptedException {
    
            t1 = new Thread(new ConsumerTest());
            t1.setDaemon(false);
            t1.start();
            /**
             * 如果发生异常,则重启consumer
             */
            /*while (true) {
                System.out.println(t1.isAlive());
                if (!t1.isAlive()) {
                    t1 = new Thread(new ConsumerTest());
                    t1.start();
                    System.out.println("重新启动");
                }
                Thread.sleep(5000);
            }*/
            // 延时500毫秒之后停止接受消息
            // Thread.sleep(500);
            // consumer.close();
        }
    
        public void run() {
            try {
                ConsumerTool consumer = new ConsumerTool();
                consumer.consumeMessage();
                while (ConsumerTool.isconnection) {    
                }
            } catch (Exception e) {
            }
    
        }
    }
    
    
    
    package cn.itcast_03_mq.topic;
    import javax.jms.Connection;      
    import javax.jms.Destination;      
    import javax.jms.ExceptionListener;
    import javax.jms.JMSException;      
    import javax.jms.MessageConsumer;      
    import javax.jms.Session;      
    import javax.jms.MessageListener;      
    import javax.jms.Message;      
    import javax.jms.TextMessage;      
         
    import org.apache.activemq.ActiveMQConnection;      
    import org.apache.activemq.ActiveMQConnectionFactory;      
         
    public class ConsumerTool implements MessageListener,ExceptionListener {      
        private String user = ActiveMQConnection.DEFAULT_USER;      
        private String password = ActiveMQConnection.DEFAULT_PASSWORD;      
        private String url ="failover://tcp://192.168.88.128:61616";//ActiveMQConnection.DEFAULT_BROKER_URL;      
        private String subject = "eee";      
        private Destination destination = null;      
        private Connection connection = null;      
        private Session session = null;      
        private MessageConsumer consumer = null;  
        public static Boolean isconnection=false;
        // 初始化      
        private void initialize() throws JMSException, Exception {      
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(      
                    user, password, url);      
            connection = connectionFactory.createConnection();      
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);      
            destination = session.createTopic(subject);      
            consumer = session.createConsumer(destination);     
        }      
         
        // 消费消息      
        public void consumeMessage() throws JMSException, Exception {      
            initialize();      
            connection.start();
            consumer.setMessageListener(this);    //监听,调用onMessage方法处理。
            connection.setExceptionListener(this);
            isconnection=true;
            System.out.println("Consumer:->Begin listening...");      
            // 开始监听  
            Message message = consumer.receive();   
            System.out.println("message:::"+message);
        }
        // 关闭连接      
        public void close() throws JMSException {      
            System.out.println("Consumer:->Closing connection");      
            if (consumer != null)      
                consumer.close();      
            if (session != null)      
                session.close();      
            if (connection != null)      
                connection.close();      
        }
        // 消息处理函数      
        public void onMessage(Message message) {      
            try {      
                if (message instanceof TextMessage) {      
                    TextMessage txtMsg = (TextMessage) message;      
                    String msg = txtMsg.getText();      
                    System.out.println("Consumer:->Received: " + msg);      
                } else {      
                    System.out.println("Consumer:->Received: " + message);      
                }      
            } catch (JMSException e) {      
                // TODO Auto-generated catch block      
                e.printStackTrace();      
            }      
        }
    
        public void onException(JMSException arg0) {
            isconnection=false;
        }      
    }      
         
  • 相关阅读:
    GNU make manual 翻译(九十九)
    GNU make manual 翻译( 九十五)
    Shell的 for 循环小例子
    makefile中对目录遍历的小例子
    GNU make manual 翻译(九十三)
    GNU make manual 翻译( 一百)
    GNU make manual 翻译( 九十七)
    GNU make manual 翻译( 九十八)
    mapserver4.8.3 的readme.win32的中文翻译文件
    遥控器编程
  • 原文地址:https://www.cnblogs.com/yaowen/p/9024327.html
Copyright © 2011-2022 走看看