zoukankan      html  css  js  c++  java
  • spring boot RedisMQ——使用RedisTemplate实现生产者消费者模式

    1.配置redis

    在application.properties文件中加入redis的配置信息

    #redis
    gmall.redis.host=172.16.19.259
    gmall.redis.port=6379
    gmall.redis.pass=Gworld2017
    gmall.redis.database=7
    gmall.redis.timeout=5000

    配置spring-redis.xml文件

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
        xmlns:tx="http://www.springframework.org/schema/tx" xmlns:task="http://www.springframework.org/schema/task"
        xmlns:redis="http://www.springframework.org/schema/redis"
        xsi:schemaLocation="
                http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
                http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
                http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
                http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd
                http://www.springframework.org/schema/redis http://www.springframework.org/schema/redis/spring-redis-1.0.xsd"
        default-lazy-init="false">
        <bean id="redisConnectionFactory"
            class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"
            primary="true">
            <property name="hostName" value="${gmall.redis.host}" />
            <property name="port" value="${gmall.redis.port}" />
            <property name="password" value="${gmall.redis.pass}" />
            <property name="timeout" value="${gmall.redis.timeout}" />
            <property name="database" value="${gmall.redis.database}" />
        </bean>
        <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate"
            primary="true">
            <property name="connectionFactory" ref="redisConnectionFactory" />
            <property name="exposeConnection" value="true" />
            <property name="keySerializer">
                <bean
                    class="org.springframework.data.redis.serializer.StringRedisSerializer" />
            </property>
            <property name="valueSerializer">
                <bean
                    class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" />
            </property>
            <property name="hashKeySerializer">
                <bean
                    class="org.springframework.data.redis.serializer.StringRedisSerializer" />
            </property>
            <property name="hashValueSerializer">
                <bean
                    class="org.springframework.data.redis.serializer.JdkSerializationRedisSerializer" />
            </property>
        </bean>
    </beans>

    2、编写RedisUtil类

    public class RedisUtil {
        private static Logger logger = LoggerFactory.getLogger(RedisUtil.class);
    
        @SuppressWarnings("rawtypes")
        private static RedisTemplate getRedisTemplate() {
            return (RedisTemplate) SpringBeanFactoryUtils.getBean("redisTemplate");
        }
    
        @SuppressWarnings("unchecked")
        public static Long addRedisSet(String redisKey, Object value) {
            Long result = getRedisTemplate().opsForSet().add(redisKey, value);
            if (logger.isDebugEnabled()) {
                logger.debug("result=" + result);
            }
            return result;
        }
    
        @SuppressWarnings("unchecked")
        public static void leftPush(String key, String value) {
            getRedisTemplate().opsForList().leftPush(key, value);
            //getRedisTemplate().opsForList().leftPop(key);
        }
        @SuppressWarnings("unchecked")
        public static String rightPop(String key,long timeout,TimeUnit unit) {
            Object obj =  getRedisTemplate().opsForList().rightPop(key, timeout, unit);
            String str =  (String) obj;
            return str;
        }
        @SuppressWarnings("unchecked")
        public static Object rightPopAndLeftPush(String sourceKey, String destinationKey) {
            Object value = getRedisTemplate().opsForList().rightPopAndLeftPush(sourceKey, destinationKey);
            return value;
        }
    }
    SpringBeanFactoryUtils类

    package com.gcard.queue.utils;
    
    import org.springframework.beans.BeansException;
    
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.ApplicationContextAware;
    
    public class SpringBeanFactoryUtils   implements ApplicationContextAware {    
        private static ApplicationContext appCtx;    
        /**  
         * 此方法可以把ApplicationContext对象inject到当前类中作为一个静态成员变量。  
         * @param applicationContext ApplicationContext 对象.  
         * @throws BeansException  
         * @author wangdf 
         */    
         
        public void setApplicationContext( ApplicationContext applicationContext ) throws BeansException {    
            appCtx = applicationContext;    
        }  
           
        /** 
         * 获取ApplicationContext 
         * @return 
         * @author wangdf 
         */  
        public static ApplicationContext getApplicationContext(){  
            return appCtx;  
        }  
           
        /**  
         * 这是一个便利的方法,帮助我们快速得到一个BEAN  
         * @param beanName bean的名字  
         * @return 返回一个bean对象  
         * @author wangdf 
         */    
        public static Object getBean( String beanName ) {
            return appCtx.getBean( beanName );
        }    
        @SuppressWarnings("unchecked")
        public static Object getBean( Class requiredType ) {
            return appCtx.getBean(requiredType);
        }    
    }

    3、模拟生产者

    public class TaskProducer implements Runnable {
        Logger logger = LoggerFactory.getLogger(getClass());
        @Override
        public void run() {
            try {
                for(int i=0;i<5;i++){
                    RedisUtil.leftPush("task-queue", "value_" + i);
                    logger.info("插入一个新的任务:" + "value_" + i);
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }
    
    }

    4、模拟消费者

    public class TaskConsumer implements Runnable {
        Logger logger = LoggerFactory.getLogger(getClass());
        @Override
        public void run() {
            while (true) {
                try {
                    String taskid = (String) RedisUtil.rightPopAndLeftPush("task-queue", "tmp-queue");//取出消息放到临时队列
                    // Thread.sleep(1000);
                    // RedisUtil.rightPop("tmp-queue");//非阻塞
    
                    // 阻塞式brpop,List中无数据时阻塞,参数0表示一直阻塞下去,直到List出现数据
                    String str = RedisUtil.rightPop("tmp-queue", 0, TimeUnit.MINUTES);//阻塞,取出临时队列
                    logger.info("线程取数据:{}", str);
                    logger.info(str + "处理成功,被清除");
    
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
            /*
             * if(random.nextInt(13) % 7 == 0){
             * RedisUtil.rightPopAndLeftPush("tmp-queue","task-queue");//弹回任务队列
             * logger.info(taskid+"处理失败,被弹回任务队列"); }else{
             * RedisUtil.rightPop("tmp-queue"); logger.info(taskid+"处理成功,被清除"); }
             */
        }
    
    }

    5、独立消费者作为一个项目(监听器)

    在application-context.xml文件配置bean,启动项目后就会执行这个监听器

    <bean class="com.gcard.longcode.manager.impl.MessageQueueServiceImpl" init-method="messageListener"/>
  • 相关阅读:
    hdu--4336--概率dp
    hdu--3905--dp
    codeforces--279--
    hdu--5023--线段树
    正则表达式
    vim编辑器使用
    圆头像控件,自动监听点击跳转到Activity
    ImageView切换两种状态下的模式
    string字符串截取
    Class对象获取方法
  • 原文地址:https://www.cnblogs.com/ouyanxia/p/9449458.html
Copyright © 2011-2022 走看看