zoukankan      html  css  js  c++  java
  • Jms消费者模式

    JMS的全称是Java Message Service,即Java消息服务。它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成一消息,并进行发送,对应的消费者在接收到对应的消息后去完成对应的业务逻辑。

    Apache官网上下载activeMQ(http://activemq.apache.org/download.html),解压运行bin目录下activemq.bat文件启动activeMQ

    对于消息的传递有两种类型,一种是点对点的,即一个生产者和一个消费者一一对应;另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。

    1.      点对点的消息模式(Point to Point Messaging)

    下面的JMS对象在点对点消息模式中是必须的:

    a.      队列(Queue) – 一个提供者命名的队列对象,客户端将会使用这个命名的队列对象

    b.     队列链接工厂(QueueConnectionFactory) – 客户端使用队列链接工厂创建链接队列

            ConnectionQueue来取得与JMS点对点消息提供者的链接。

    c.      链接队列(ConnectionQueue) – 一个活动的链接队列存在在客户端与点对点消息提供者之

            间,客户用它创建一个或者多个JMS队列会话(QueueSession)

    d.     队列会话(QueueSession) – 用来创建队列消息的发送者与接受者(QueueSenderand

             QueueReceiver)

    e.     消息发送者(QueueSender 或者MessageProducer)– 发送消息到已经声明的队列

    f.       消息接受者(QueueReceiver或者MessageConsumer) – 接受已经被发送到指定队列的消息

    2.      发布订阅模式(publish – subscribe Mode)

    a.      主题Topic(Destination) – 一个提供者命名的主题对象,客户端将会使用这个命名的主题对象

    b.     主题链接工厂(TopciConnectionFactory) – 客户端使用主题链接工厂创建链接主题

             ConnectionTopic来取得与JMS消息Pub/Sub提供者的链接。

    c.      链接主题(ConnectionTopic) – 一个活动的链接主题存在发布者与订阅者之间

    d.     会话(TopicSession) – 用来创建主题消息的发布者与订阅者 (TopicPublisher  and

             TopicSubscribers)

    e.     消息发送者MessageProducer) – 发送消息到已经声明的主题

    f.       消息接受者(MessageConsumer) – 接受已经被发送到指定主题的消息

    以感知数据为例子

    activemq.properties配置文件:

    topic=csp.jxmessages
    ipaddress=10.100.70.102
    #ipaddress=localhost
    port=61616
    username=user
    password=user

    消费者模式:

    package com.ship;
    
    import java.sql.DriverManager;
    import java.sql.SQLException;
    import java.sql.Statement;
    import java.util.Properties;
    
    import javax.annotation.PostConstruct;
    import javax.annotation.PreDestroy;
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.springframework.stereotype.Controller;
    import com.common.utils.PropertyLoader;
    
    @Controller
    public class JmsConsumer implements MessageListener {
        private java.sql.Connection con = null;
        private static JmsConsumer instance = null;
    
        public JmsConsumer() {
            if (instance != null)
                throw new RuntimeException();
            instance = this;
        }
    
        public JmsConsumer getInstance() {
            return instance;
        }
    
        @Override
        public void onMessage(Message message) {
            if (message instanceof TextMessage) {
                TextMessage txtMsg = (TextMessage) message;
                try {
                    String content = txtMsg.getText();
                    System.out.println("数据是:
    " + content);
                    String[] arrcontent = content.split("
    ");
                    // con.setAutoCommit(false);
                    // Statement statement = con.createStatement();
                    for (int i = 0; i < arrcontent.length; i++) {
                        String[] shipinfo = arrcontent[i].split(",");
                        // System.out.println(shipinfo[0]); //测试船名
                        // 存储过程
                        // String sql = "call ganzhi()";
                        // 添加批处理
                        // statement.addBatch(sql);
                    }
                    // 批处理 因为存在读写延迟
                    // statement.executeBatch();
                    // con.commit();
                    // statement.close();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        @PostConstruct
        public void init() {
            try {
                // 数据库连接
                buildSqlCon();
                // jms处理
                buildJmsCon();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        private void buildSqlCon() {
            // TODO Auto-generated method stub
            Properties jdbcprop = PropertyLoader.getPropertiesFromClassPath(
                    "jdbc.properties", "UTF-8");
            String driverClassName = jdbcprop.getProperty("jdbc.driverClassName");
            String jdbcurl = jdbcprop.getProperty("jdbc.url");
            String uname = jdbcprop.getProperty("jdbc.username");
            String pwd = jdbcprop.getProperty("jdbc.password");
            try {
                Class.forName(driverClassName);
                con = DriverManager.getConnection(jdbcurl + "&user=" + uname
                        + "&password=" + pwd);
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
    
        }
    
        public void buildJmsCon() {
            // 消费者的主要流程
            Connection connection = null;
            Properties properties = PropertyLoader.getPropertiesFromClassPath(
                    "activemq.properties", "UTF-8");
            String topic = properties.getProperty("topic");
            String username = properties.getProperty("username");
            String password = properties.getProperty("password");
            String ipaddress = properties.getProperty("ipaddress");
            String port = properties.getProperty("port");
            String brokerURL = "failover://tcp://" + ipaddress + ":" + port;
            try {
                // 1.初始化connection工厂
                ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                        username, password, brokerURL);
    
                // 2.创建Connection
                connection = connectionFactory.createConnection();
    
                // 3.打开连接
                connection.start();
    
                System.out.println("连接成功..................");
    
                // 4.创建session
                Session session = connection.createSession(false,
                        Session.AUTO_ACKNOWLEDGE);
    
                // 5.创建消息目标
                Destination destination = session.createTopic(topic);
    
                // 6.创建消费者
                MessageConsumer consumer = session.createConsumer(destination);
    
                // 7.配置监听
                consumer.setMessageListener(getInstance());
    
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    
        @PreDestroy
        public void destroy() {
            if (con != null)
                try {
                    con.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
        }
    
    }

    测试:

    数据是:
    浙余杭货01803,120.16185,30.098183,0.0,140,140,2016-06-02 09:40:13:000,,WM2HZ803244,1,0
    浙富阳货00759,119.844083,29.932533,null,0,0,2016-06-02 09:40:13:000,,WM2HZ804658,1,0
    浙长兴货1952,120.320117,30.630767,0.0,0,0,2016-06-02 09:40:13:000,,WM2HZ820795,1,0
    浙富阳货00759,119.844083,29.932533,0.0,0,0,2016-06-02 09:40:13:000,,WM2HZ804658,1,0
    浙越城货0677,118.859033,32.171567,14.0,50,50,2016-06-02 09:40:13:000,,WM2HZ814170,1,0
    浙钱江货35077,120.122567,30.400967,0.0,340,340,2016-06-02 09:40:13:000,,WM2HZ802275,1,0
    浙越城货0677,118.859033,32.171567,14.0,50,50,2016-06-02 09:40:13:000,,WM2HZ814170,1,0
    浙余杭货01667,120.12385,30.402033,0.0,20,20,2016-06-02 09:40:13:000,,WM2HZ801640,1,0
    浙越城货0670,120.736333,32.040183,0.0,210,210,2016-06-02 09:40:14:000,,WM2HZ817006,1,0
    浙安吉货2389,120.149133,30.636817,0.0,220,220,2016-06-02 09:40:14:000,,WM2HZ811204,1,0
    浙钱江货00628,120.053833,30.047217,8.0,210,210,2016-06-02 09:40:14:000,,WM2HZ800153,1,0
    浙绍运6-25,120.473033,30.6393,6.0,230,230,2016-06-02 09:40:14:000,,WM2HZ801842,1,0
    浙上虞货0553,119.705217,29.82325,null,330,330,2016-06-02 09:40:14:000,,WM2HZ801018,1,0
    浙余杭货01782,120.07065,30.399117,0.0,20,20,2016-06-02 09:40:14:000,,WM2HZ801764,1,0
    浙桐庐货00483,120.217017,30.269617,0.0,20,20,2016-06-02 09:40:14:000,,WM2HZ803645,1,0
    汇海集026,120.1237,30.401283,0.0,0,0,2016-06-02 09:40:14:000,,WM2HZ811220,1,0
    海源186,121.394967,31.485683,0.0,0,0,2016-06-02 09:40:14:000,,WM2HZ815369,1,0
    任强1号,120.085533,30.0672,0.0,30,30,2016-06-02 09:40:14:000,,WM2HZ818227,1,0
    金顺668,120.166217,30.125183,0.0,0,0,2016-06-02 09:40:14:000,,WM2HZ819091,1,0
    浙富阳货00636,119.705533,29.823683,null,200,200,2016-06-02 09:40:14:000,,WM2HZ804624,1,0
    浙临安游026,119.769083,30.230167,0.0,0,0,2016-06-02 09:40:14:000,,WM2HZ817016,1,0
    浙萧山货23922,120.29145,30.501783,7.0,80,80,2016-06-02 09:40:14:000,,WM2HZ818998,1,0
    浙萧山货23922,120.29145,30.501783,null,80,80,2016-06-02 09:40:14:000,,WM2HZ818998,1,0
    浙嘉善货03216,120.141717,30.355,0.0,280,280,2016-06-02 09:40:14:000,,WM2HZ800918,1,0
    浙萧山货03166,119.835467,29.907917,0.0,310,310,2016-06-02 09:40:14:000,,WM2HZ801393,1,0
    浙萧山货23751,120.156167,30.100917,0.0,100,100,2016-06-02 09:40:14:000,,WM2HZ802551,1,0
    合肥武运628,120.16455,30.125717,null,0,0,2016-06-02 09:40:14:000,,WM2HZ802141,1,0

  • 相关阅读:
    P4387 P4387 【深基15.习9】验证栈序列
    P1241 括号序列题解
    P2058 海港题解
    P1540 机器翻译题解
    leaflet + react + typescript
    TypeScript中文手册:从 JavaScript 迁移到 TypeScript
    react-esri-leaflet与typescript
    TypeError: Super expression must either be null or a function
    前端库(gis前端库和普通库分开)
    react-leaflet:Module parse failed: Unexpected token (10:41)
  • 原文地址:https://www.cnblogs.com/manusas/p/5551964.html
Copyright © 2011-2022 走看看