zoukankan      html  css  js  c++  java
  • 关于MQ 消息队列的通俗理解和 rabbitMQ 使用

     消息队列,一听很高大上,现在很多分布式系统都在用这个消息中间件 网上一搜, 说的都是些原理。

     说下我的通俗理解, 你网上买了, 快递员给你投递, 会出现什么问题呢? 1  你不定时在家, 快递员 来了几次你都不在,不能到你手里。 2. 快递员很忙,手里一堆货物, 最后送到你手里就很慢了。

     有问题就要解决,1: 你不定时在家? 1 送到你的房东家,然后你回去挨个找,  2 放到小区的 快递投递点,那里很多家 门牌号的 小柜子 ,你的东西就在那, 你自己去取 。

                           2 : 快递员很忙? 把快递放在仓库, 哪个快递员有空闲就去给你送。

     这里对应的就是生产消费了。 我们知道, RPC 分布式服务, 把我们的业务系统都给划分了, 而不是之前一个复杂的系统了, 用户?拆分。 订单?拆分。 交易?拆分。基础服务?拆分。关系?拆分。各司其职。 分布式系统好处利大于弊。

     各个业务 明确自己干什么, 另外可以针对性的提示性能 比如 用户业务访问的明显比订单多, 用户 业务 服务器加 三台, 如果你是一个复杂的大系统就不好调试了, 跑题了。 各业务必然有关联, 用户注册。必须要通过基础服务的 发短信, 推送。

      这里就必须要中间件了,消息队列 有两种方式,  广播 ,  排队,小区快递柜对应的各家的, 就是广播, 你自己拿你自己的。 不耽误快递员时间, 不耽误你时间, 也不耽误别人的时间, 排队,就是放在 房东家, 自己挨个去找。 不管如何, 这种异步方式都是

    极大的提升了效率。

    这里放个RabbitMQ 的简单示例, 现在各种中间件 都不错, 各有所长。

    1 启动类

    public class RechargeApplication implements CommandLineRunner {
        private static final Logger logger = LoggerFactory.getLogger(RechargeApplication.class);
    
        @Autowired
        private Environment env;
    
        public static void main(String[] args) {
            SpringApplication.run(RechargeApplication.class, args);
    //启动的时候注册消费者, 一旦 注册, 消费者就会监听 队列,有数据就去拿,基于AMQP协议 , 举个例子, 你的快递被放到小区快递箱了, 手机收到短信, 你的快递到了。 快去A503 快递箱收取, 验证码 123456. SpringBeanHelper.getBean(Consumer.
    class).payConsumer(); } public void run(String... strings) throws Exception { logger.info("Coupon service started, run at {} environment", env.getActiveProfiles()); } }
    SpringBeanHelper

    public class SpringBeanHelper implements ApplicationContextAware {
        private static final Logger LOG = LoggerFactory.getLogger(SpringBeanHelper.class);
        protected static ApplicationContext applicationContext;
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) {
            SpringBeanHelper.applicationContext = applicationContext;
        }
    
        public static <T> T getBean(Class<T> requiredClass) {
            try {
                return applicationContext.getBean(requiredClass);
            } catch (NoSuchBeanDefinitionException e) {
                LOG.warn("no qualifying bean of type: {}", requiredClass);
                return null;
            }
        }
    
        public static <T> T getBean(String beanName, Class<T> requiredClass) {
            try {
                return applicationContext.getBean(beanName, requiredClass);
            } catch (NoSuchBeanDefinitionException e) {
                LOG.warn("no qualifying bean of name: {}, type: {}", beanName, requiredClass);
                return null;
            }
        }
    
        public static String projectDir(){
            return System.getProperty("user.dir");
        }
    
        public static String contextPath() {
            String basePath = applicationContext.getEnvironment().getProperty("server.context-path");
            return StringUtils.isBlank(basePath) ? "" : basePath;
        }
    
        public static String applicationName() {
            return applicationContext.getEnvironment().getProperty("spring.application.name");
        }
    
        public static Integer applicationPort(){
            return Integer.parseInt(applicationContext.getEnvironment().getProperty("server.port"));
        }
    
        public static String applicationEnv() {
            String[] envList = applicationContext.getEnvironment().getActiveProfiles();
            if (!CollectionUtils.isNullOrEmpty(envList)) {
                return envList[0];
            }
            return "";
        }
    
        /**
         * 判断给定的 taskNum 是不是本机要同步的房间
         */
        public static boolean isMineTask(long taskNum) {
            DiscoveryClient discoveryClient = applicationContext.getBean(DiscoveryClient.class);
            //通过服务发现取所有本服务的实例
            List<ServiceInstance> serviceList = discoveryClient.getInstances(applicationName());
            if (CollectionUtils.isNullOrEmpty(serviceList)) {
                return false;
            }
            SortedSet<EurekaDiscoveryClient.EurekaServiceInstance> esiSet = Sets.newTreeSet((o1, o2) -> {
                if (NetworkUtils.ip2long(o1.getHost()) < NetworkUtils.ip2long(o2.getHost())) {
                    return 1;
                } else if(NetworkUtils.ip2long(o1.getHost()) == NetworkUtils.ip2long(o2.getHost())){
                    if(o1.getPort() < o2.getPort()){
                        return 1;
                    }else {
                        return -1;
                    }
                } else {
                    return -1;
                }
            });
            //过滤状态为UP的实例并按IP排序
            for (ServiceInstance si : serviceList) {
                if (si instanceof EurekaDiscoveryClient.EurekaServiceInstance) {
                    EurekaDiscoveryClient.EurekaServiceInstance esi = (EurekaDiscoveryClient.EurekaServiceInstance) si;
                    if (InstanceInfo.InstanceStatus.UP == esi.getInstanceInfo().getStatus()) {
                        esiSet.add(esi);
                    }
                }
            }
            if (CollectionUtils.isNullOrEmpty(esiSet)) {
                return false;
            }
            int index = 0, esiSize = esiSet.size();
            long theIp = NetworkUtils.ip2long(NetworkUtils.ofInnerIp());
            if (1 == esiSet.size()) {
                return theIp == NetworkUtils.ip2long(esiSet.first().getHost());
            }
            //找到本机实例并且当 taskNum 对所有实例数取余等于本机实例 index 值时处理
            for (EurekaDiscoveryClient.EurekaServiceInstance esi : esiSet) {
                if (NetworkUtils.ip2long(esi.getHost()) == theIp && index == taskNum % esiSize) {
                    return true;
                }
                index++;
            }
            return false;
        }
    }

    3 。生成者 , 快递员:

        public void producer(Enum topic, Object message) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("rabbit producer topic: {}, message: {}", topic, JsonUtils.toJSONString(message));
            }
            try {
                AMQP.BasicProperties props = propertyOf(message);
                senderChannel.queueDeclare(topic.name(), true, false, false, null);
                this.senderChannel.basicPublish("", topic.name(), props, JsonUtils.toJSONBytes(message));
            } catch (Exception e) {
                throw new RuntimeException("rabbitmq send message error.....", e);
            }
        }

    4 消费者

    public class Consumer {
    
    
        public static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);
    
        @Autowired
        private RabbitOperations rabbitOperations;
    
        @Autowired
        private RechargeService rechargeService;
    
        /**
         * 支付成功回调
         */
        public void payConsumer() {
            rabbitOperations.consumer(TradeCallBackTopic.CHARGE, 3, (contentType, body) -> {
                String content = StringUtils.toEncodedString(body, Charset.forName("UTF-8"));
                LOGGER.info("=======消费者:payConsumer start, content:{}============", content);
                CallBackDto dto = JSON.parseObject(content, CallBackDto.class);
                if (dto.getCallBackType() == PaymentConstant.CallBackNotifyType.PAY
                        && dto.getTradeStatus() == PaymentConstant.TradeStatus.SUCCESS) {
                }
    
            });
    
    
        }
    
    
    }

             

     

  • 相关阅读:
    奥运圣火在家乡传递
    Please stop reinventing the wheel (请不要重复发明轮子)
    使用IDispatch::Invoke函数在C++中调用C#实现的托管类库方法
    To invoke and to begin invoke, that is a question.
    XML和JSON(JavaScript Object Notation)
    Cloud Computing Is a Big Whiteboard
    TRIE Data Structure
    ASP.NET AJAX UpdatePanel 控件实现剖析
    分布式计算、网格计算和云计算
    系统架构设计师考试大纲(2009版)
  • 原文地址:https://www.cnblogs.com/zgghb/p/6830977.html
Copyright © 2011-2022 走看看