zoukankan      html  css  js  c++  java
  • 【并发】8、借助redis 实现多线程生产消费阻塞队列

    顾名思义这个就是再消费的时候,不是之前的那哥用yield进行线程切换的操作,而是用线程等待阻塞的方式去执行,说实话我感觉效率不一定有之前那个好,

    因为我对这种阻塞队列使用的时候,之前有发现阻塞队列,塞着塞着线程就会进入假死状态,这个很奇怪,但是有的时候又是好的,这个也不清楚到底是为什么

    但是毕竟也是一种实现,我就写出来了看看吧

    生产者

    package queue.redisQueue;
    
    import queue.fqueue.vo.TempVo;
    import redis.clients.jedis.Jedis;
    
    import java.io.ByteArrayOutputStream;
    import java.io.ObjectOutputStream;
    import java.util.UUID;
    
    /**
     * @ProjectName: cutter-point
     * @Package: queue.redisQueue
     * @ClassName: RedisQueueProducter2
     * @Author: xiaof
     * @Description: ${description}
     * @Date: 2019/6/12 16:29
     * @Version: 1.0
     */
    public class RedisQueueProducter2 implements Runnable {
    
        private Jedis jedis;
        private String queueKey;
    
        public RedisQueueProducter2(Jedis jedis, String queueKey) {
            this.jedis = jedis;
            this.queueKey = queueKey;
        }
    
        @Override
        public void run() {
    
            while(true) {
    
                try {
                    Thread.sleep((long) (Math.random() * 1000));
    
                    //不存在则创建,存在则直接插入
                    //向redis队列中存放数据
                    //生成数据
                    TempVo tempVo = new TempVo();
                    tempVo.setName(Thread.currentThread().getName() + ",time is:" + UUID.randomUUID());
                    //序列化为字节
                    ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
                    ObjectOutputStream objectOutputStream = new ObjectOutputStream(arrayOutputStream);
                    objectOutputStream.writeObject(tempVo);
                    arrayOutputStream.flush();
    
                    try {
                        int i = 0;
                        while(i < 10) {
                            long num = jedis.lpush(queueKey.getBytes(), arrayOutputStream.toByteArray());
                            if(num > 0) {
                                System.out.println("成功!");
                                break;
                            }
    
                            ++i;
                        }
                    } catch (Exception e) {
                        System.out.println("失败!");
    //                    long num = jedis.lpush(queueKey.getBytes(), arrayOutputStream.toByteArray());
                    }
    
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
        }
    }

    消费者

    package queue.redisQueue;
    
    import queue.fqueue.vo.EventVo;
    import redis.clients.jedis.Jedis;
    
    import java.io.ByteArrayInputStream;
    import java.io.IOException;
    import java.io.ObjectInputStream;
    import java.util.List;
    
    /**
     * @ProjectName: cutter-point
     * @Package: queue.redisQueue
     * @ClassName: RedisQueueConsume2
     * @Author: xiaof
     * @Description: ${description}
     * @Date: 2019/6/12 16:40
     * @Version: 1.0
     */
    public class RedisQueueConsume2 implements Runnable {
    
        private Jedis jedis;
        private String queueKey;
    
        public RedisQueueConsume2(Jedis jedis, String queueKey) {
            this.jedis = jedis;
            this.queueKey = queueKey;
        }
    
    
        @Override
        public void run() {
    
    
            while(true) {
                List<byte[]> bytesList = null;
                try{
                    //这种就是阻塞队列模式
                    bytesList = jedis.blpop(0, queueKey.getBytes());
                } catch (Exception e) {
    
                }
    
                //反序列化对象
                if(bytesList == null || bytesList.size() <= 0) {
                    Thread.yield();
                    continue;
                }
    
                //获取第二个对象,就是我们的字节数组
                System.out.println(new String(bytesList.get(0)));
                ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytesList.get(1));
                try {
                    ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream);
                    EventVo eventVo = (EventVo) objectInputStream.readObject();
    
                    eventVo.doOperater();
    
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (ClassNotFoundException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    测试代码

    消费队列

     接下来我们把生产线程停掉

     此时队列还有

     我们把它消费完

     当只剩最后一个的时候

     

     可以进入下一步,好当队列为空的时候,我们再尝试去取数据的时候

     队列会阻塞再这个地方,相当于是挂起线程

  • 相关阅读:
    阿里云乌班图16配置-PHP环境(包括mysql及apache安装)
    mysql主从复制跳过错误
    64位系统下powerdesigner15连接oracle odbc
    解决“指定的服务已经标记为删除”问题
    mysql系列-安装及服务启动
    数据缓存管理
    redis-在乌班图下设置自动启动
    redis-配置文件
    redis安装
    linux-用户建立及权限分配
  • 原文地址:https://www.cnblogs.com/cutter-point/p/11011084.html
Copyright © 2011-2022 走看看