zoukankan      html  css  js  c++  java
  • rabbitMQ应用,laravel生产广播消息,springboot消费消息

    最近做一个新需求,用户发布了动态,前台需要查询,为了用户读取信息响应速度更快(MySQL很难实现或者说实现起来很慢),所以在用户动态发布成功后,利用消息机制异步构建 redis缓存 和 elasticsearch索引 。

    开发环境

    rabbitMQ服务端,docker安装

    拉取rabbit-mq镜像
    docker pull hub.c.163.com/library/rabbitmq:3.6.10-management
    
    运行镜像
    docker run -d --name rabbitmq --publish 5671:5671  --publish 5672:5672 --publish 4369:4369 --publish 25672:25672 --publish 15671:15671 --publish 15672:15672  hub.c.163.com/library/rabbitmq:3.6.10-management
    
    后台地址:
    http://192.168.1.8:15672

    消息生产端(PHP):

    composer 安装 rabbitmq客户端
    composer require php-amqplib/php-amqplib
    
    
    生产广播消息官方demo
    https://github.com/php-amqplib/php-amqplib/blob/master/demo/amqp_publisher_fanout.php

    应用中代码

    <?php
    /**
     * User: szliugx@gmail.com
     * Date: 2018/6/18
     * Time: 下午1:54
     */
    
    namespace AppThirdPartyMessage;
    
    use PhpAmqpLibConnectionAMQPStreamConnection;
    use PhpAmqpLibMessageAMQPMessage;
    
    class AmqpPublisher
    {
        public function send($content)
        {
            $exchange = 'message.fanout.exchange';
            // 创建连接
            $connection = new AMQPStreamConnection(
                config('app.mq_host'),
                config('app.mq_port'),
                config('app.mq_user'),
                config('app.mq_pass'),
                config('app.mq_vhost')
            );
            $channel = $connection->channel();
            /*
                name: $exchange
                type: fanout
                passive: false // don't check is an exchange with the same name exists
                durable: false // the exchange won't survive server restarts
                auto_delete: true //the exchange will be deleted once the channel is closed.
            */
            $channel->exchange_declare($exchange, 'fanout', false, true, false);
            $messageBody = $content;
            $message = new AMQPMessage($messageBody, array('content_type' => 'text/plain'));
            $channel->basic_publish($message, $exchange);
            // 关闭通道
            $channel->close();
            // 关闭连接
            $connection->close();
        }
    }

    消息消费端(Java):

    引入maven依赖
    
    <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
        

    配置广播队列信息

    package cn.taxiong.release.config;
    
    import cn.taxiong.release.constant.QueueConstants;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    
    /**
     * RabbitMQFanout模式配置
     *
     * @author szliugx@gmail.com
     * @create 2018-06-18 下午4:04
     **/
    @Slf4j
    @Configuration
    public class RabbitMQFanoutConfig {
    
    
        @Bean
        public Queue createFanoutQueueCache() {
            log.info( "创建了FanoutQueue cache 缓存 队列" );
            return new Queue(QueueConstants.MESSAGE_QUEUE_RELEASE_CACHE_NAME);
        }
    
        @Bean
        public Queue createFanoutQueueIndex() {
            log.info( "创建了FanoutQueue index 缓存 队列" );
            return new Queue(QueueConstants.MESSAGE_QUEUE_RELEASE_INDEX_NAME);
        }
    
        @Bean
        public FanoutExchange fanoutExchangeRelease() {
            log.info( "创建了fanoutExchange交换机" );
            return new FanoutExchange( QueueConstants.MESSAGE_FANOUT_EXCHANGE);
        }
    
        @Bean
        public Binding fanoutExchangeCacheQueueBinding() {
            log.info( "将FanoutQueue cache 队列绑定到交换机fanoutExchange" );
            return BindingBuilder.bind( createFanoutQueueCache() ).to( fanoutExchangeRelease() );
        }
    
        @Bean
        public Binding fanoutExchangeIndexQueueBinding() {
            log.info( "将FanoutQueue index 队列绑定到交换机fanoutExchange" );
            return BindingBuilder.bind( createFanoutQueueIndex() ).to( fanoutExchangeRelease() );
        }
    }

    队列常量信息

    package cn.taxiong.release.constant;
    
    /**
     * 队列常量
     *
     * @author szliugx@gmail.com
     * @create 2018-06-14 下午7:02
     **/
    public interface QueueConstants {/**
         * 消息交换
         */
        String MESSAGE_FANOUT_EXCHANGE = "message.fanout.exchange";
    
        /**
         * 发布缓存消息队列名称
         */
        String MESSAGE_QUEUE_RELEASE_CACHE_NAME = "message.release.cache.queue";
    
        /**
         * 发布索引消息队列名称
         */
        String MESSAGE_QUEUE_RELEASE_INDEX_NAME = "message.release.index.queue";
    }


    缓存(cache)服务消费消息:

    package cn.taxiong.release.message;
    
    import cn.taxiong.release.constant.QueueConstants;
    import cn.taxiong.release.service.OperateReleaseService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.handler.annotation.Payload;
    import org.springframework.stereotype.Component;
    
    /**
     * 消息消费
     *
     * @author szliugx@gmail.com
     * @create 2018-06-14 下午7:14
     **/
    @Slf4j
    @Component
    @RabbitListener(queues = QueueConstants.MESSAGE_QUEUE_RELEASE_CACHE_NAME)
    public class MessageConsumer {
    
        @Autowired
        private OperateReleaseService operateReleaseService;
    
        @RabbitHandler
        public void handler(@Payload String message) {
            // operateReleaseService.storeReleaseRedisCache(message);
            log.info("缓存cache消息消费1:{}", message);
        }
    }


    索引(index)服务消费消息:

    package cn.taxiong.release.message;
    
    import cn.taxiong.release.constant.QueueConstants;
    import cn.taxiong.release.service.OperateReleaseService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.handler.annotation.Payload;
    import org.springframework.stereotype.Component;
    
    /**
     * 消息消费
     *
     * @author szliugx@gmail.com
     * @create 2018-06-14 下午7:14
     **/
    @Slf4j
    @Component
    @RabbitListener(queues = QueueConstants.MESSAGE_QUEUE_RELEASE_INDEX_NAME)
    public class MessageConsumer2 {
    
        @Autowired
        private OperateReleaseService operateReleaseService;
    
        @RabbitHandler
        public void handler(@Payload String message) {
            log.info("索引消息 index 消费2:{}", message);
        }
    }

  • 相关阅读:
    NYOJ--1058--dfs--部分和问题
    js中数组的操作方法
    eval()函数
    ES6 对象新增方法 object.is() object.assign()
    vue 中的nextTick
    vue vue-cli创建项目步骤方法
    node express创建项目步骤
    get post put delete
    vue中的状态管理 vuex store
    vue的实例属性$options
  • 原文地址:https://www.cnblogs.com/liugx/p/9196067.html
Copyright © 2011-2022 走看看