zoukankan      html  css  js  c++  java
  • 关于MQ 消息队列的通俗理解和 rabbitMQ 使用

     消息队列,一听很高大上,现在很多分布式系统都在用这个消息中间件 网上一搜, 说的都是些原理。

     说下我的通俗理解, 你网上买了, 快递员给你投递, 会出现什么问题呢? 1  你不定时在家, 快递员 来了几次你都不在,不能到你手里。 2. 快递员很忙,手里一堆货物, 最后送到你手里就很慢了。

     有问题就要解决,1: 你不定时在家? 1 送到你的房东家,然后你回去挨个找,  2 放到小区的 快递投递点,那里很多家 门牌号的 小柜子 ,你的东西就在那, 你自己去取 。

                           2 : 快递员很忙? 把快递放在仓库, 哪个快递员有空闲就去给你送。

     这里对应的就是生产消费了。 我们知道, RPC 分布式服务, 把我们的业务系统都给划分了, 而不是之前一个复杂的系统了, 用户?拆分。 订单?拆分。 交易?拆分。基础服务?拆分。关系?拆分。各司其职。 分布式系统好处利大于弊。

     各个业务 明确自己干什么, 另外可以针对性的提示性能 比如 用户业务访问的明显比订单多, 用户 业务 服务器加 三台, 如果你是一个复杂的大系统就不好调试了, 跑题了。 各业务必然有关联, 用户注册。必须要通过基础服务的 发短信, 推送。

      这里就必须要中间件了,消息队列 有两种方式,  广播 ,  排队,小区快递柜对应的各家的, 就是广播, 你自己拿你自己的。 不耽误快递员时间, 不耽误你时间, 也不耽误别人的时间, 排队,就是放在 房东家, 自己挨个去找。 不管如何, 这种异步方式都是

    极大的提升了效率。

    这里放个RabbitMQ 的简单示例, 现在各种中间件 都不错, 各有所长。

    1 启动类

    public class RechargeApplication implements CommandLineRunner {
        private static final Logger logger = LoggerFactory.getLogger(RechargeApplication.class);
    
        @Autowired
        private Environment env;
    
        public static void main(String[] args) {
            SpringApplication.run(RechargeApplication.class, args);
    //启动的时候注册消费者, 一旦 注册, 消费者就会监听 队列,有数据就去拿,基于AMQP协议 , 举个例子, 你的快递被放到小区快递箱了, 手机收到短信, 你的快递到了。 快去A503 快递箱收取, 验证码 123456. SpringBeanHelper.getBean(Consumer.
    class).payConsumer(); } public void run(String... strings) throws Exception { logger.info("Coupon service started, run at {} environment", env.getActiveProfiles()); } }
    SpringBeanHelper

    public class SpringBeanHelper implements ApplicationContextAware {
        private static final Logger LOG = LoggerFactory.getLogger(SpringBeanHelper.class);
        protected static ApplicationContext applicationContext;
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) {
            SpringBeanHelper.applicationContext = applicationContext;
        }
    
        public static <T> T getBean(Class<T> requiredClass) {
            try {
                return applicationContext.getBean(requiredClass);
            } catch (NoSuchBeanDefinitionException e) {
                LOG.warn("no qualifying bean of type: {}", requiredClass);
                return null;
            }
        }
    
        public static <T> T getBean(String beanName, Class<T> requiredClass) {
            try {
                return applicationContext.getBean(beanName, requiredClass);
            } catch (NoSuchBeanDefinitionException e) {
                LOG.warn("no qualifying bean of name: {}, type: {}", beanName, requiredClass);
                return null;
            }
        }
    
        public static String projectDir(){
            return System.getProperty("user.dir");
        }
    
        public static String contextPath() {
            String basePath = applicationContext.getEnvironment().getProperty("server.context-path");
            return StringUtils.isBlank(basePath) ? "" : basePath;
        }
    
        public static String applicationName() {
            return applicationContext.getEnvironment().getProperty("spring.application.name");
        }
    
        public static Integer applicationPort(){
            return Integer.parseInt(applicationContext.getEnvironment().getProperty("server.port"));
        }
    
        public static String applicationEnv() {
            String[] envList = applicationContext.getEnvironment().getActiveProfiles();
            if (!CollectionUtils.isNullOrEmpty(envList)) {
                return envList[0];
            }
            return "";
        }
    
        /**
         * 判断给定的 taskNum 是不是本机要同步的房间
         */
        public static boolean isMineTask(long taskNum) {
            DiscoveryClient discoveryClient = applicationContext.getBean(DiscoveryClient.class);
            //通过服务发现取所有本服务的实例
            List<ServiceInstance> serviceList = discoveryClient.getInstances(applicationName());
            if (CollectionUtils.isNullOrEmpty(serviceList)) {
                return false;
            }
            SortedSet<EurekaDiscoveryClient.EurekaServiceInstance> esiSet = Sets.newTreeSet((o1, o2) -> {
                if (NetworkUtils.ip2long(o1.getHost()) < NetworkUtils.ip2long(o2.getHost())) {
                    return 1;
                } else if(NetworkUtils.ip2long(o1.getHost()) == NetworkUtils.ip2long(o2.getHost())){
                    if(o1.getPort() < o2.getPort()){
                        return 1;
                    }else {
                        return -1;
                    }
                } else {
                    return -1;
                }
            });
            //过滤状态为UP的实例并按IP排序
            for (ServiceInstance si : serviceList) {
                if (si instanceof EurekaDiscoveryClient.EurekaServiceInstance) {
                    EurekaDiscoveryClient.EurekaServiceInstance esi = (EurekaDiscoveryClient.EurekaServiceInstance) si;
                    if (InstanceInfo.InstanceStatus.UP == esi.getInstanceInfo().getStatus()) {
                        esiSet.add(esi);
                    }
                }
            }
            if (CollectionUtils.isNullOrEmpty(esiSet)) {
                return false;
            }
            int index = 0, esiSize = esiSet.size();
            long theIp = NetworkUtils.ip2long(NetworkUtils.ofInnerIp());
            if (1 == esiSet.size()) {
                return theIp == NetworkUtils.ip2long(esiSet.first().getHost());
            }
            //找到本机实例并且当 taskNum 对所有实例数取余等于本机实例 index 值时处理
            for (EurekaDiscoveryClient.EurekaServiceInstance esi : esiSet) {
                if (NetworkUtils.ip2long(esi.getHost()) == theIp && index == taskNum % esiSize) {
                    return true;
                }
                index++;
            }
            return false;
        }
    }

    3 。生成者 , 快递员:

        public void producer(Enum topic, Object message) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("rabbit producer topic: {}, message: {}", topic, JsonUtils.toJSONString(message));
            }
            try {
                AMQP.BasicProperties props = propertyOf(message);
                senderChannel.queueDeclare(topic.name(), true, false, false, null);
                this.senderChannel.basicPublish("", topic.name(), props, JsonUtils.toJSONBytes(message));
            } catch (Exception e) {
                throw new RuntimeException("rabbitmq send message error.....", e);
            }
        }

    4 消费者

    public class Consumer {
    
    
        public static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);
    
        @Autowired
        private RabbitOperations rabbitOperations;
    
        @Autowired
        private RechargeService rechargeService;
    
        /**
         * 支付成功回调
         */
        public void payConsumer() {
            rabbitOperations.consumer(TradeCallBackTopic.CHARGE, 3, (contentType, body) -> {
                String content = StringUtils.toEncodedString(body, Charset.forName("UTF-8"));
                LOGGER.info("=======消费者:payConsumer start, content:{}============", content);
                CallBackDto dto = JSON.parseObject(content, CallBackDto.class);
                if (dto.getCallBackType() == PaymentConstant.CallBackNotifyType.PAY
                        && dto.getTradeStatus() == PaymentConstant.TradeStatus.SUCCESS) {
                }
    
            });
    
    
        }
    
    
    }

             

     

  • 相关阅读:
    命令拷屏之网络工具
    PHP 设计模式 笔记与总结(1)命名空间 与 类的自动载入
    Java实现 计蒜客 1251 仙岛求药
    Java实现 计蒜客 1251 仙岛求药
    Java实现 计蒜客 1251 仙岛求药
    Java实现 蓝桥杯 算法训练 字符串合并
    Java实现 蓝桥杯 算法训练 字符串合并
    Java实现 蓝桥杯 算法训练 字符串合并
    Java实现 LeetCode 143 重排链表
    Java实现 LeetCode 143 重排链表
  • 原文地址:https://www.cnblogs.com/zgghb/p/6830977.html
Copyright © 2011-2022 走看看