zoukankan      html  css  js  c++  java
  • 用过消息队列?Kafka?能否手写一个消息队列?懵

    是否有同样的经历?面试官问你做过啥项目,我一顿胡侃,项目利用到了消息队列,kafka,rocketMQ等等。

    好的,那请开始你的表演,面试官递过一支笔:给我手写一个消息队列!!WHAT?

    为了大家遇到这种场景还能愉快的zhuangbi,所以写一篇文章,凑合用一下。

    想要实现一个消息队列,我们需要关组以下几点:

    1.首先有一个队列(FIFO)来存放消息

    2.消息队列容量有限

    3.需要入队,出队方法

    4.需要考虑多线程并发情况

    <1>.简单版:用LinkedList实现一个简单的消息队列

    这里用LinkedList来实现队列,然后通过synchronized关键字来实现多线程的互斥,用LinkedList的addLast方法实现队列的push,用LinkedList的removeFirst实现队列的remove方法

    //实现FIFO队列
    public class MyList<T> {
        private LinkedList<T> storage = new LinkedList<T>();
        private int statckSize = 2000;
        public synchronized void push(T e) {//需要加上同步
            storage.addLast(e);
        }
    
        public synchronized T peek() {
            if(storage!=null&&storage.size()>0){
                return storage.peekFirst();
            }
            return null;
    
        }
    
        public void remove() {
            storage.removeFirst();
        }
    
        public boolean empty() {
            return storage.isEmpty();
        }
    }
    View Code

    测试类:

    public class ListTest {
        public static void main(String[] args) {
            MyList<String> myList = new MyList<String>();
            for(String s : "the prefect code".split(" ")){//LIFO
                myList.push(s);
            }
            while(!myList.empty()){
                System.out.print(myList.peek()+" ");
                myList.remove();
            }
        }
    
    }
    View Code

    <2>.进阶版,仍然用LinkedList来实现队列,给出仓库的概念(消息队列仓库),生产者和消费者分别在独立线程中实现,使用object的wait(),notify()和synchronized()实现线程操作的同步与互斥(Obj.wait(),与Obj.notify()必须要与synchronized(Obj)一起使用,也就是wait,与notify是针对已经获取了Obj锁进行操作,从语法角度来说就是Obj.wait(),Obj.notify必须在synchronized(Obj){...}语句块内。)

    抽象仓库类:

    public interface AbstractStorage {
        void consumer(int num);
        void producer(int num);
    }
    View Code

    生产者线程:

    class Producer extends Thread {
        //生产数量
        private int num;
        //仓库
        private AbstractStorage abstractStorage;
    
        public Producer(AbstractStorage abstractStorage,int num){
            this.abstractStorage=abstractStorage;
            this.num=num;
        }
        // 调用仓库Storage的生产函数
        public void produce(int num){
            abstractStorage.producer(num);
        }
        // 线程run函数
        @Override
        public void run(){
            produce(num);
        }
    
    }
    View Code

    消费者线程:

    class Consumer extends Thread {
         //消费数量
         private int num;
         //仓库
         private AbstractStorage abstractStorage;
    
         public Consumer(AbstractStorage abstractStorage,int num){
            this.abstractStorage=abstractStorage;
            this.num=num;
         }
    
         public void consume(int num){
             abstractStorage.consumer(num);
         }
    
         @Override
        public void run(){
             consume(num);
         }
    
    }
    View Code

    消息队列(仓库)实现类:

    public class Storage1 implements AbstractStorage {
        //最大容量
        private final int MAX_SIZE = 100;
        //存储载体
        private LinkedList list =new LinkedList();
    
        @Override
        public void consumer(int num) {
            synchronized (list) {
                while (num > list.size()) {
                    try {
                        list.wait();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    System.out.println("【阻塞】当前要消费的数量:" + num + ",当前库存量:" + list.size() + "当前消费阻塞");
                }
                for (int i = 0; i < num; i++) {
                    list.removeFirst();
                }
                System.out.println("【consumer】 "+Thread.currentThread().getName()+" 已消费产品数:" + num + ",现库存数:" + list.size());
                list.notifyAll();
            }
        }
    
        //生产
        @Override
        public void producer(int num) {
            synchronized (list) {
                while (list.size() + num > MAX_SIZE) {
                    try {
                        list.wait();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    System.out.println("【阻塞】当前队列已满,生产阻塞");
                }
                for (int i = 0; i < num; i++) {
                    list.addLast(new Object());
                }
                System.out.println("【producer】 "+Thread.currentThread().getName()+ " 已生产产品数:" + num + ",现库存数:" + list.size());
                list.notifyAll();
            }
        }
    
    
    }
    View Code

    测试类:

    public class Test {
        public static void main(String[] args) {
            AbstractStorage abstractStorage =new Storage1();
    
            //生产者对象
            Producer p1 = new Producer(abstractStorage,10);
            Producer p2 = new Producer(abstractStorage,10);
            Producer p3 = new Producer(abstractStorage,10);
            Producer p4 = new Producer(abstractStorage,10);
            Producer p5 = new Producer(abstractStorage,10);
            Producer p6 = new Producer(abstractStorage,10);
            Producer p7 = new Producer(abstractStorage,10);
            Producer p8 = new Producer(abstractStorage,50);
            //消费者对象
            Consumer c1 = new Consumer(abstractStorage,20);
            Consumer c2 = new Consumer(abstractStorage,30);
            Consumer c3 = new Consumer(abstractStorage,50);
    
            c1.start();
            c2.start();
            c3.start();
    
            p1.start();
            p2.start();
            p3.start();
            p4.start();
            p5.start();
            p6.start();
            p7.start();
            p8.start();
    
        }
    
    
    
    }
    View Code

    最终结果显示,我们能实现简单的生产消费,并且是线程同步的。

  • 相关阅读:
    《linux 内核全然剖析》 笔记 CODE_SPACE 宏定义分析
    Item 8:析构函数不要抛出异常 Effective C++笔记
    Eclipse经常使用快捷键
    多人即时战斗游戏服务端系列[2]--90坦克Online游戏对象介绍以及渲染机制
    STM8S awu及看门狗IWDG WWDG应用(转)
    使用STM8SF103 ADC采样电压(转)
    BHS-STM32工具系列
    STM32 FLASH模拟EEPROM 使用和优化(转)
    STM32+NRF24L01无线(转)
    像51一样操作STM32的IO(转)
  • 原文地址:https://www.cnblogs.com/miketwais/p/10877992.html
Copyright © 2011-2022 走看看