zoukankan      html  css  js  c++  java
  • spring boot RedisMQ——使用RedisTemplate实现生产者消费者模式

    1.配置redis

    在application.properties文件中加入redis的配置信息

    #redis
    gmall.redis.host=172.16.19.259
    gmall.redis.port=6379
    gmall.redis.pass=Gworld2017
    gmall.redis.database=7
    gmall.redis.timeout=5000

    配置spring-redis.xml文件

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
        xmlns:tx="http://www.springframework.org/schema/tx" xmlns:task="http://www.springframework.org/schema/task"
        xmlns:redis="http://www.springframework.org/schema/redis"
        xsi:schemaLocation="
                http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
                http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
                http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
                http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd
                http://www.springframework.org/schema/redis http://www.springframework.org/schema/redis/spring-redis-1.0.xsd"
        default-lazy-init="false">
        <bean id="redisConnectionFactory"
            class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"
            primary="true">
            <property name="hostName" value="${gmall.redis.host}" />
            <property name="port" value="${gmall.redis.port}" />
            <property name="password" value="${gmall.redis.pass}" />
            <property name="timeout" value="${gmall.redis.timeout}" />
            <property name="database" value="${gmall.redis.database}" />
        </bean>
        <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate"
            primary="true">
            <property name="connectionFactory" ref="redisConnectionFactory" />
            <property name="exposeConnection" value="true" />
            <property name="keySerializer">
                <bean
                    class="org.springframework.data.redis.serializer.StringRedisSerializer" />
            </property>
            <property name="valueSerializer">
                <bean
                    class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" />
            </property>
            <property name="hashKeySerializer">
                <bean
                    class="org.springframework.data.redis.serializer.StringRedisSerializer" />
            </property>
            <property name="hashValueSerializer">
                <bean
                    class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" />
            </property>
        </bean>
    </beans>

    2、编写RedisUtil类

    public class RedisUtil {
        private static Logger logger = LoggerFactory.getLogger(RedisUtil.class);
    
        @SuppressWarnings("rawtypes")
        private static RedisTemplate getRedisTemplate() {
            return (RedisTemplate) SpringBeanFactoryUtils.getBean("redisTemplate");
        }
    
        @SuppressWarnings("unchecked")
        public static Long addRedisSet(String redisKey, Object value) {
            Long result = getRedisTemplate().opsForSet().add(redisKey, value);
            if (logger.isDebugEnabled()) {
                logger.debug("result=" + result);
            }
            return result;
        }
    
        @SuppressWarnings("unchecked")
        public static void leftPush(String key, String value) {
            getRedisTemplate().opsForList().leftPush(key, value);
            //getRedisTemplate().opsForList().leftPop(key);
        }
        @SuppressWarnings("unchecked")
        public static String rightPop(String key,long timeout,TimeUnit unit) {
            Object obj =  getRedisTemplate().opsForList().rightPop(key, timeout, unit);
            String str =  (String) obj;
            return str;
        }
        @SuppressWarnings("unchecked")
        public static Object rightPopAndLeftPush(String sourceKey, String destinationKey) {
            Object value = getRedisTemplate().opsForList().rightPopAndLeftPush(sourceKey, destinationKey);
            return value;
        }
    }
    SpringBeanFactoryUtils类

    package com.gcard.queue.utils;
    
    import org.springframework.beans.BeansException;
    
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.ApplicationContextAware;
    
    public class SpringBeanFactoryUtils   implements ApplicationContextAware {    
        private static ApplicationContext appCtx;    
        /**  
         * 此方法可以把ApplicationContext对象inject到当前类中作为一个静态成员变量。  
         * @param applicationContext ApplicationContext 对象.  
         * @throws BeansException  
         * @author wangdf 
         */    
         
        public void setApplicationContext( ApplicationContext applicationContext ) throws BeansException {    
            appCtx = applicationContext;    
        }  
           
        /** 
         * 获取ApplicationContext 
         * @return 
         * @author wangdf 
         */  
        public static ApplicationContext getApplicationContext(){  
            return appCtx;  
        }  
           
        /**  
         * 这是一个便利的方法,帮助我们快速得到一个BEAN  
         * @param beanName bean的名字  
         * @return 返回一个bean对象  
         * @author wangdf 
         */    
        public static Object getBean( String beanName ) {
            return appCtx.getBean( beanName );
        }    
        @SuppressWarnings("unchecked")
        public static Object getBean( Class requiredType ) {
            return appCtx.getBean(requiredType);
        }    
    }

    3、模拟生产者

    public class TaskProducer implements Runnable {
        Logger logger = LoggerFactory.getLogger(getClass());
        @Override
        public void run() {
            try {
                for(int i=0;i<5;i++){
                    RedisUtil.leftPush("task-queue", "value_" + i);
                    logger.info("插入一个新的任务:" + "value_" + i);
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }
    
    }

    4、模拟消费者

    public class TaskConsumer implements Runnable {
        Logger logger = LoggerFactory.getLogger(getClass());
        @Override
        public void run() {
            while (true) {
                try {
                    String taskid = (String) RedisUtil.rightPopAndLeftPush("task-queue", "tmp-queue");//取出消息放到临时队列
                    // Thread.sleep(1000);
                    // RedisUtil.rightPop("tmp-queue");//非阻塞
    
                    // 阻塞式brpop,List中无数据时阻塞,参数0表示一直阻塞下去,直到List出现数据
                    String str = RedisUtil.rightPop("tmp-queue", 0, TimeUnit.MINUTES);//阻塞,取出临时队列
                    logger.info("线程取数据:{}", str);
                    logger.info(str + "处理成功,被清除");
    
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
            /*
             * if(random.nextInt(13) % 7 == 0){
             * RedisUtil.rightPopAndLeftPush("tmp-queue","task-queue");//弹回任务队列
             * logger.info(taskid+"处理失败,被弹回任务队列"); }else{
             * RedisUtil.rightPop("tmp-queue"); logger.info(taskid+"处理成功,被清除"); }
             */
        }
    
    }

    5、独立消费者作为一个项目(监听器)

    在application-context.xml文件配置bean,启动项目后就会执行这个监听器

    <bean class="com.gcard.longcode.manager.impl.MessageQueueServiceImpl" init-method="messageListener"/>
  • 相关阅读:
    POJ 3468 A Simple Problem with Integers
    BZOJ 4430 Guessing Camels
    POJ 2309 BST
    POJ 1990 MooFest
    cf 822B Crossword solving
    cf B. Black Square
    cf 828 A. Restaurant Tables
    Codefroces 822C Hacker, pack your bags!
    [HDU 2255] 奔小康赚大钱
    [BZOJ 1735] Muddy Fields
  • 原文地址:https://www.cnblogs.com/ouyanxia/p/9449458.html
Copyright © 2011-2022 走看看