zoukankan      html  css  js  c++  java
  • activemq的使用

    一.常用的消息队列:

    1 activemq javaapache

    2 rabbitmq cmq

    3 kafuka 大数据mq

    4 zeromq 简单版的mq

    5 mateMq 基于amqp

    6 RocketMQ 阿里

    二.mq的使用

    解压和启动mq


    activeMq start

     

     三 .mq的角色

    producer消息的发送者

    Comsumer消息的消费者

    queue方式; 把消息发给activemq服务器,消费端监听到只要有一个执行完成其他就不会再执行了.

    topic方式: 把消息发给activemq服务器,消费端监听到都会执行.

     

     

    四: 加入Pom依赖

    <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-activemq</artifactId>
       <exclusions>
          <exclusion>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-log4j12</artifactId>
          </exclusion>
       </exclusions>
    </dependency>

    <dependency>
       <groupId>org.apache.activemq</groupId>
       <artifactId>activemq-pool</artifactId>
       <version>5.15.2</version>
       <exclusions>
          <exclusion>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-log4j12</artifactId>
          </exclusion>
       </exclusions>
    </dependency>

     五:页面访问地址:localhost:8161  ,账号密码都是admin 

    六:整合activemq客户端到项目中

    @Configuration
    public class ActiveMQConfig {

    @Value("${spring.activemq.broker-url:disabled}")
    String brokerURL ;

    @Value("${activemq.listener.enable:disabled}")
    String listenerEnable;

    @Bean
    public ActiveMQUtil getActiveMQUtil() throws JMSException {
    if(brokerURL.equals("disabled")){
    return null;
    }
    ActiveMQUtil activeMQUtil=new ActiveMQUtil();
    activeMQUtil.init(brokerURL);
    return activeMQUtil;
    }

    //定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
    @Bean(name = "jmsQueueListener")
    public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory ) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    if(!listenerEnable.equals("true")){
    return null;
    }

    factory.setConnectionFactory(activeMQConnectionFactory);
    //设置并发数
    factory.setConcurrency("5");

    //重连间隔时间
    factory.setRecoveryInterval(5000L);
    factory.setSessionTransacted(false);
    factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);

    return factory;
    }


    @Bean
    public ActiveMQConnectionFactory activeMQConnectionFactory ( ){
    /* if((url==null||url.equals(""))&&!brokerURL.equals("disabled")){
    url=brokerURL;
    }*/
    ActiveMQConnectionFactory activeMQConnectionFactory =
    new ActiveMQConnectionFactory( brokerURL);
    return activeMQConnectionFactory;
    }

    }

    public class ActiveMQUtil {
    PooledConnectionFactory pooledConnectionFactory=null;

    public ConnectionFactory init(String brokerUrl) {

    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
    //加入连接池
    pooledConnectionFactory=new PooledConnectionFactory(factory);
    //出现异常时重新连接
    pooledConnectionFactory.setReconnectOnException(true);
    //
    pooledConnectionFactory.setMaxConnections(5);
    pooledConnectionFactory.setExpiryTimeout(10000);
    return pooledConnectionFactory;
    }

    public ConnectionFactory getConnectionFactory(){
    return pooledConnectionFactory;
    }
    }

    例子: 
    支付controller 层支付成功后,发送系统消息 =》 更新订单状态 锁定商品库存 物流订单等等,例如在支付的service层发送消息告诉订单需要更改状态, 然后在订单服务里面写component注解,里面实现监听更改业务即可.
    @RequestMapping("alipay/callback/return")
    public String alipay_callback(Model model, HttpServletRequest request){

    String alipay_trade_no = request.getParameter("trade_no");//支付宝的交易单号
    String order_sn = request.getParameter("out_trade_no");// 外部订单号total_amount
    String pay_amount = request.getParameter("total_amount");

    // 更新支付信息
    PaymentInfo paymentInfo = new PaymentInfo();
    // 交易单号
    // 支付状态
    String payment_status = "已支付";
    // 回调内容
    String callback_content = request.getQueryString();
    // 回调时间
    Date callback_time = new Date();

    paymentInfo.setOrderSn(order_sn);
    paymentInfo.setPaymentStatus(payment_status);
    paymentInfo.setCallbackTime(callback_time);
    paymentInfo.setAlipayTradeNo(alipay_trade_no);
    paymentInfo.setCallbackContent(callback_content);
    paymentInfo.setTotalAmount(new BigDecimal(pay_amount));
    paymentService.updatePayment(paymentInfo);

    // 发送系统消息 =》 更新订单状态 锁定商品库存 物流订单等等
    paymentService.sendPaymentResult(paymentInfo);

    return "finish";
    }


    @Autowired
    ActiveMQUtil activeMQUtil;

    @Override
    public void sendPaymentResult(PaymentInfo paymentInfo) {

    ConnectionFactory connectionFactory = activeMQUtil.getConnectionFactory();

    Connection connection = null;
    Session session = null;// 开启消息事务
    Queue paymentResultQueue = null; // 队列
    MessageProducer producer = null;
    try {
    connection = connectionFactory.createConnection();
    connection.start();
    session = connection.createSession(true, Session.SESSION_TRANSACTED);
    paymentResultQueue = session.createQueue("PAYMENT_SUCCESS_QUEUE");
    //text文本格式,map键值格式
    MapMessage mapMessage=new ActiveMQMapMessage();
    mapMessage.setString("out_trade_no",paymentInfo.getOrderSn());
    mapMessage.setDouble("pay_amount",paymentInfo.getTotalAmount().doubleValue());
    producer = session.createProducer(paymentResultQueue);// 消息的生成者
    producer.send(mapMessage);
    session.commit();
    } catch (JMSException e) {
    e.printStackTrace();
    }finally {
    try {
    producer.close();
    session.close();
    connection.close();
    } catch (JMSException e) {
    e.printStackTrace();
    }
    }
    }


    @Component
    public class OrderConsumer {

    @Autowired
    OrderService orderService;

    @JmsListener(containerFactory = "jmsQueueListener",destination = "PAYMENT_SUCCESS_QUEUE")
    public void consumePaymentSuccess(MapMessage mapMessage) throws JMSException {
    String out_trade_no = mapMessage.getString("out_trade_no");
    Double pay_amount = mapMessage.getDouble("pay_amount");

    // 根据支付状态,更新订单信息
    OmsOrder omsOrder = new OmsOrder();
    omsOrder.setPayAmount(new BigDecimal(pay_amount));
    omsOrder.setPaymentTime(new Date());
    omsOrder.setOrderSn(out_trade_no);
    omsOrder.setStatus("1");
    orderService.updateOrder(omsOrder);

    System.out.println("已监听到"+out_trade_no+"号订单,订单消费PAYMENT_SUCCESS_QUEUE队列");

    }

    }


  • 相关阅读:
    hihocoder #1138 : Islands Travel
    关于c中的inline
    LUOGU P2921 [USACO08DEC]在农场万圣节Trick or Treat on the Farm
    LUOGU P1908 逆序对
    归并排序 (模板)
    tyvj 1864 守卫者的挑战
    loj #10001. 「一本通 1.1 例 2」种树
    bzoj 1026: [SCOI2009]windy数
    BZOJ 4521: [Cqoi2016]手机号码
    LUOGU 3089 后缀排序(模板)
  • 原文地址:https://www.cnblogs.com/liuyi13535496566/p/11706149.html
Copyright © 2011-2022 走看看