zoukankan      html  css  js  c++  java
  • zbb20190430 springboot 配置alimq

     源码地址

    https://pan.baidu.com/disk/home?errno=0&errmsg=Auth%20Login%20Sucess&&bduss=&ssnerror=0&traceid=#/all?vmode=list&path=%2Fcode%2Fmq%2Falimq

    1.

    2.pom

    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.zbb.mq.app</groupId>
        <artifactId>AliMq</artifactId>
        <packaging>war</packaging>
        <version>0.0.1-SNAPSHOT</version>
        <name>AliMq Maven Webapp</name>
        <url>http://maven.apache.org</url>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.0.5.RELEASE</version>
        </parent>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>com.aliyun.openservices</groupId>
                <artifactId>ons-client</artifactId>
                <version>1.7.8.Final</version>
            </dependency>
    
    
        </dependencies>
        <build>
            <finalName>AliMq</finalName>
        </build>
    </project>

    3.

    package com.zbb.alimq.app;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class AppAliMq {
        public static void main(String[] args) {
            SpringApplication.run(AppAliMq.class, args);
        }
    }

    4.

    package com.zbb.alimq.app.config;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Properties;
    
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import com.aliyun.openservices.ons.api.MessageListener;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    import com.aliyun.openservices.ons.api.bean.ConsumerBean;
    import com.aliyun.openservices.ons.api.bean.ProducerBean;
    import com.aliyun.openservices.ons.api.bean.Subscription;
    import com.zbb.alimq.app.util.AliMQConsumerListener;
    
    @Configuration
    public class AliMqConfig {
        @Value("${aliyunMq.producerId}")
        public String producerId;
    
        @Value("${aliyunMq.consumerId}")
        public String consumerId;
    
        @Value("${aliyunMq.jconsumerId}")
        public String jconsumerId;
    
        @Value("${aliyunMq.flightDelayConsumerId}")
        public String flightDelayConsumerId;
    
        @Value("${aliyunMq.accessKey}")
        public String accessKey;
    
        @Value("${aliyunMq.secretKey}")
        public String secretKey;
    
        @Value("${aliyunMq.topic}")
        public String topic;
    
        @Value("${aliyunMq.flightDelayTopic}")
        public String flightDelayTopic;
    
        @Value("${aliyunMq.tagDep}")
        public String tagDep;
    
        @Value("${aliyunMq.tagArr}")
        public String tagArr;
    
        @Value("${aliyunMq.tagFlightDelay}")
        public String tagFlightDelay;
    
        @Value("${aliyunMq.onsAddr}")
        public String onsAddr;
    
        // 超时时间
        @Value("${aliyunMq.sendMsgTimeoutMillis}")
        public String sendMsgTimeoutMillis;
    
        @Value("${aliyunMq.suspendTimeMillis}")
        public String suspendTimeMillis;
    
        @Value("${aliyunMq.maxReconsumeTimes}")
        public String maxReconsumeTimes;
    
        @Bean(initMethod = "start", destroyMethod = "shutdown")
        public ProducerBean getProducer() {
            ProducerBean producerBean = new ProducerBean();
            Properties properties = new Properties();
            properties.put(PropertyKeyConst.ProducerId, producerId);
            // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
            properties.put(PropertyKeyConst.AccessKey, accessKey);
            // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
            properties.put(PropertyKeyConst.SecretKey, secretKey);
            properties.put(PropertyKeyConst.SendMsgTimeoutMillis, sendMsgTimeoutMillis);
            properties.put(PropertyKeyConst.ONSAddr, onsAddr);
            producerBean.setProperties(properties);
            return producerBean;
        }
    
        @Bean(initMethod = "start", destroyMethod = "shutdown")
        public ConsumerBean getConsumer() {
            ConsumerBean consumerBean = new ConsumerBean();
            Properties properties = new Properties();
            properties.put(PropertyKeyConst.ConsumerId, consumerId);
            // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
            properties.put(PropertyKeyConst.AccessKey, accessKey);
            // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
            properties.put(PropertyKeyConst.SecretKey, secretKey);
    
            properties.put(PropertyKeyConst.ONSAddr, onsAddr);
            consumerBean.setProperties(properties);
            Subscription subscription = new Subscription();
            subscription.setTopic(topic);
            subscription.setExpression(tagDep);
            Map<Subscription, MessageListener> map = new HashMap();
            map.put(subscription, new AliMQConsumerListener());
            consumerBean.setSubscriptionTable(map);
    
            return consumerBean;
        }
    
    }

    5.

    package com.zbb.alimq.app.util;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.aliyun.openservices.ons.api.Action;
    import com.aliyun.openservices.ons.api.ConsumeContext;
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.MessageListener;
    
    public class AliMQConsumerListener implements MessageListener {
        private static final Logger logger = LoggerFactory.getLogger(AliMQConsumerListener.class);
    
        @Override
        public Action consume(Message message, ConsumeContext context) {
            String msg = "";
            try { // do something..
                msg = new String(message.getBody(), "UTF-8");
                logger.info("订阅消息:" + msg);
                return Action.CommitMessage;
            } catch (Exception e) { // 消费失败
                logger.info("消费失败:" + msg);
                return Action.ReconsumeLater;
            }
        }
    }

    6.

    package com.zbb.alimq.app.util;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.Producer;
    import com.aliyun.openservices.ons.api.SendResult;
    import com.aliyun.openservices.ons.api.exception.ONSClientException;
    import com.zbb.alimq.app.config.AliMqConfig;
    
    @Component
    @RestController
    @RequestMapping("mq")
    public class AliMQUtil {
        private static final Logger logger = LoggerFactory.getLogger(AliMQUtil.class);
        @Autowired
        private AliMqConfig aliMQConfig;
    
        @Value("${aliyunMq.topic}")
        public String topic; // 发送消息
        @Value("${aliyunMq.tagDep}")
        public String tagDep;
    
        @RequestMapping("sendMessage")
        public void sendMessage() {
            Producer producer = aliMQConfig.getProducer();
            byte[] body = "123".getBytes();
            Message msg = new Message(topic, tagDep, body); // msg.setKey(key);
            try {
                SendResult sendResult = producer.send(msg);
                if (sendResult != null) {
                    logger.info("消息发送成功:" + sendResult.toString());
                }
            } catch (ONSClientException e) {
                logger.info("消息发送失败:", e);
                // 出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。
            }
        }
    }

    7.application.yml

    server:
      port: 9080
      servlet:
        context-path: /
      tomcat:
        uri-encoding: UTF-8
        max-threads: 1000
        min-spare-threads: 30
    aliyunMq: 
      producerId: *
      consumerId: *
      jconsumerId: *
      accessKey: *
      secretKey: *
      tagDep: *
      tagArr: *
      topic: *
      onsAddr: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
      sendMsgTimeoutMillis: 3000
      suspendTimeMillis: 100
      maxReconsumeTimes: 20
      flightDelayConsumerId: *
      tagFlightDelay: *
      flightDelayTopic: *
  • 相关阅读:
    DGA域名可以是色情网站域名
    使用cloudflare加速你的网站隐藏你的网站IP
    167. Two Sum II
    leetcode 563. Binary Tree Tilt
    python 多线程
    leetcode 404. Sum of Left Leaves
    leetcode 100. Same Tree
    leetcode 383. Ransom Note
    leetcode 122. Best Time to Buy and Sell Stock II
    天津Uber优步司机奖励政策(12月28日到12月29日)
  • 原文地址:https://www.cnblogs.com/super-admin/p/10794937.html
Copyright © 2011-2022 走看看