zoukankan      html  css  js  c++  java
  • ActiveMQ的简单使用

    项目中使用的介绍

    一、运行ActiveMQ

    在文件路径下...apache-activemq-5.13.3inwin64

    运行activemq.bat 

    这是系统中的使用,运行后还可以访问相应的页面。

    二、项目中的实践

    1.依赖的jar包

    <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-jms</artifactId>
            </dependency>
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-broker</artifactId>
            </dependency>
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-pool</artifactId>
            </dependency>
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-spring</artifactId>
            </dependency>
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>artemis-jms-client</artifactId>
            </dependency>
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>artemis-jms-server</artifactId>
            </dependency>

    2.配置ActiveMQ的配置,要和运行的保持一致,在commom.properties文件中。

    activemq.broker.url=tcp://127.0.0.1:61616
    activemq.user=admin
    activemq.password=admin
    activemq.name=datacenter.test
    activemq.name.access=datacenter.access

    3.applicationContext-mq.xml 中配置

    <!-- 1.配置ActiveMQ 连接工厂 -->
        <amq:connectionFactory id="creditMQConnectionFactory"
                               brokerURL="${activemq.broker.url}" userName="${activemq.user}" password="${activemq.password}"  />
    
        <!-- 2.配置缓存 ConnectionFactory -->
        <bean id="creditMQconnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
            <property name="targetConnectionFactory" ref="creditMQConnectionFactory"/>
            <property name="sessionCacheSize" value="100" />
        </bean>
    
        <!-- 定义JmsTemplate的Queue类型 -->
        <bean id="creditJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <constructor-arg ref="creditMQConnectionFactory" />
            <property name="pubSubDomain" value="false" />
        </bean>
    
        <!-- 定义授权队列消费者
        <bean id="creditConsumer" class="com.itom.mq.consumer.CreditConsumer"/>-->
    
        <!-- 定义Queue监听器 -->
        <jms:listener-container destination-type="queue" container-type="default" connection-factory="creditMQconnectionFactory" acknowledge="auto">    
        </jms:listener-container>
        <!--运营相关配置结束-->

    4.编写生产者和消费者

      1) 生产者

    import org.apache.activemq.command.ActiveMQQueue;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    import javax.jms.Queue;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    @Component
    public class CreditProducer {
    
        private Map<String ,Queue> cacheQueue = new ConcurrentHashMap<String ,Queue>();
    
        @Resource
        private JmsTemplate creditJmsTemplate;
    
        public void pushCreditData(String queueName ,Object message){
    
            Queue queue = cacheQueue.get(queueName);
            if(queue == null){
                queue = new ActiveMQQueue(queueName);
                cacheQueue.put(queueName,queue);
            }
            this.creditJmsTemplate.convertAndSend(queue, message);
        }
    }

      2) 消费者:

    public class MessageReceiver implements MessageListener {
    
        //自己定义的类,使用线程池来处理消息
        @Resource(name = "messageComputePool")
        MessageComputePool pool;
    
        public void onMessage(Message message) {
            if(message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    String text = textMessage.getText();
                    System.out.println(String.format("Received: %s",text));
                    JSONObject jsonObj = (JSONObject)JSONObject.parse(text);
                    ...
                    //消费者接收到消息就可以进行处理了
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
    
    
        }
    }

      3)使用生产者推消息

    @Service("collectService ")
    public class CollectServiceImpl implements CollectService {
          public static final Logger LOG  = LoggerFactory.getLogger(CollectServiceImpl.class);
    
          @Resource
          private CreditProducer creditProducer;
    
          @Override
          public void collect(CollectParameter parameter) {
                Map dataMap = new HashMap();
                dataMap.put("sourceCode", parameter.getValue());
                dataMap.put("userId",parameter.getUserid());           
                creditProducer.pushCreditData("itom.activemq.access", JSON.toJSONString(dataMap));
            }
        }
    }

    推荐个ActiveMQ入门教程:

    http://blog.csdn.net/jolingogo/article/category/1658165

  • 相关阅读:
    Redis与Memcached的incr/decr差异对比
    Linux sudo用法与配置
    Docker 常用命令
    Linux之间配置SSH互信(SSH免密码登录)
    SVN服务器搭建
    shell中参数的传递
    【代码更新】IIC协议建模——读写EEPROM
    串口完整项目之串口收发字符串
    串口发送模块——1字节数据发送
    状态机设计——从简单的按键消抖开始
  • 原文地址:https://www.cnblogs.com/itommy/p/10610332.html
Copyright © 2011-2022 走看看