zoukankan      html  css  js  c++  java
  • 多线程之事例

    1.【JAVA多线程】如何解决一个生产者与消费者问题

    如何解决一个生产者与消费者问题
    生产者与消费者问题是多线程同步的一个经典问题。生产者和消费者同时使用一块缓冲区,生产者生产商品放入缓冲区,消费者从缓冲区中取出商品。我们需要保证的是,当缓冲区满时,生产者不可生产商品;当缓冲区为空时,消费者不可取出商品。

    下面介绍java中几种解决同步问题的方式

    (1)wait()与notify()方法

    (2)Lock与Condition机制

    (3)BlockingQueue阻塞队列

    【1】wait()与notify()方法
    这两个方法是object类中的方法

    wait()用在以下场合:
    (1)当缓冲区满时,缓冲区调用wait()方法,使得生产者释放锁,当前线程阻塞,其他线程可以获得锁。

    (2)当缓冲区空时,缓冲区调用wait()方法,使得消费者释放锁,当前线程阻塞,其他线程可以获得锁。

    notify()用在以下场合:
    (1)当缓冲区未满时,生产者生产商品放入缓冲区,然后缓冲区调用notify()方法,通知上一个因wait()方法释放锁的线程现在可以去获得锁了,同步块代码执行完成后,释放对象锁,此处的对象锁,锁住的是缓冲区。

    (2)当缓冲区不为空时,消费者从缓冲区中取出商品,然后缓冲区调用notify()方法,通知上一个因wait()方法释放锁的线程现在可以去获得锁了,同步块代码执行完成后,释放对象锁。

    /**
    * 生产者消费者问题
    */
    public class ProAndCon {
    //最大容量
    public static final int MAX_SIZE = 2;
    //存储媒介
    public static LinkedList<Integer> list = new LinkedList<>();

    class Producer implements Runnable {
    @Override
    public void run() {
    synchronized (list) {
    //仓库容量已经达到最大值
    while (list.size() == MAX_SIZE) {
    System.out.println("仓库已满,生产者" + Thread.currentThread().getName() + "不可生产.");
    try {
    list.wait();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    list.add(1);
    System.out.println("生产者" + Thread.currentThread().getName() + "生产, 仓库容量为" + list.size());
    list.notify();
    }
    }
    }

    class Consumer implements Runnable {


    @Override
    public void run() {
    synchronized (list) {
    while (list.size() == 0) {
    System.out.println("仓库为空,消费者" + Thread.currentThread().getName() + "不可消费.");
    try {
    list.wait();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    list.removeFirst();
    System.out.println("消费者" + Thread.currentThread().getName() + "消费,仓库容量为" + list.size());
    list.notify();
    }
    }
    }

    public static void main(String[] args) {
    ProAndCon proAndCon = new ProAndCon();
    Producer producer = proAndCon.new Producer();
    Consumer consumer = proAndCon.new Consumer();
    for (int i = 0; i < 10; i++) {
    Thread pro = new Thread(producer);
    pro.start();
    Thread con = new Thread(consumer);
    con.start();
    }
    }

    }
    运行结果:

    【2】Lock与Condition机制
    在JDK5.0之后,Java提供了Lock与Condition机制。Condition接口的await()和signal()是用来做同步的两种方法,它们的功能基本上和Object的wait()、nofity()相同,或者说可以取代它们,但是它们和Lock机制是直接挂钩的。通过在Lock对象上调用newCondition()方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。

    package day1101;
    import java.util.LinkedList;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;

    public class ProAndCon2 {
    public static final int MAX_SIZE = 2;
    public static LinkedList<Integer> list = new LinkedList<>();
    public static Lock lock = new ReentrantLock();
    //仓库满的条件变量
    public static Condition full = lock.newCondition();
    //仓库空的条件变量
    public static Condition empty = lock.newCondition();

    class Producer implements Runnable {

    @Override
    public void run() {
    lock.lock();
    while (list.size() == MAX_SIZE) {
    try {
    System.out.println("仓库已满,生产者" + Thread.currentThread().getName() + "不可生产.");
    full.await();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    list.add(1);
    System.out.println("生产者" + Thread.currentThread().getName() + "生产, 仓库容量为" + list.size());
    //唤醒其他生产者与消费者线程
    full.signal();
    empty.signal();
    lock.unlock();
    }
    }

    class Consumer implements Runnable {

    @Override
    public void run() {
    lock.lock();
    while (list.size() == 0) {
    try {
    System.out.println("仓库为空,消费者" + Thread.currentThread().getName() + "不可消费.");
    empty.await();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    list.removeFirst();
    System.out.println("消费者" + Thread.currentThread().getName() + "消费,仓库容量为" + list.size());
    //唤醒其他生产者与消费者线程
    full.signal();
    empty.signal();
    lock.unlock();
    }
    }

    public static void main(String[] args) {
    ProAndCon2 proAndCon = new ProAndCon2();
    Producer producer = proAndCon.new Producer();
    Consumer consumer = proAndCon.new Consumer();

    for (int i = 0; i < 10; i++) {
    Thread pro = new Thread(producer);
    pro.start();
    Thread con = new Thread(consumer);
    con.start();
    }
    }
    }
    运行结果:


    【3】使用BlockingQueue阻塞队列
    什么是阻塞队列?
    如果向一个已经满了的队列中添加元素或者从空队列中移除元素,都将会导致线程阻塞,线程一直等待到有旧元素被移除或新元素被添加的时候,才能继续执行。

    符合这种情况的队列,称为阻塞队列。

    JDK 1.5 以后新增BlockingQueue接口,我们采用它实现类的其中两个类,ArrayBlockingQueue或者是LinkedBlockingQueue。

    怎么使用LinkedBlockingQueue?
    这里我们用LinkedBlockingQueue来解决生产者与消费者问题,主要用到它的两个方法,即put()与take()

    put():向阻塞队列中添加一个元素,队列满时,自动阻塞。

    take():从阻塞队列中取出一个元素,队列空时,自动阻塞。

    其实LinkedBlockingQueue底层使用的仍然是Lock与Condition机制,我们从源码就可以看出来

    //..............用到了Lock与Condition机制

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();
    //...........put方法

    /**
    * Inserts the specified element at the tail of this queue, waiting if
    * necessary for space to become available.
    *
    * @throws InterruptedException {@inheritDoc}
    * @throws NullPointerException {@inheritDoc}
    */
    public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
    /*
    * Note that count is used in wait guard even though it is
    * not protected by lock. This works because count can
    * only decrease at this point (all other puts are shut
    * out by lock), and we (or some other waiting put) are
    * signalled if it ever changes from capacity. Similarly
    * for all other uses of count in other wait guards.
    */
    while (count.get() == capacity) {
    notFull.await();
    }
    enqueue(node);
    c = count.getAndIncrement();
    if (c + 1 < capacity)
    notFull.signal();
    } finally {
    putLock.unlock();
    }
    if (c == 0)
    signalNotEmpty();
    }

    //...........take方法

    public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
    while (count.get() == 0) {
    notEmpty.await();
    }
    x = dequeue();
    c = count.getAndDecrement();
    if (c > 1)
    notEmpty.signal();
    } finally {
    takeLock.unlock();
    }
    if (c == capacity)
    signalNotFull();
    return x;
    }
    看得出来,LinkedBlockingQueue底层已经解决好了同步问题,我们可以很方便的使用它。

    代码演示:
    package day1024;

    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;

    /**
    * 解决生产者与消费者问题
    * 采用阻塞队列BlockingQueue
    */
    public class ProAndCon3 {
    public static final int MAX_SIZE = 2;
    public static BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(MAX_SIZE);

    class Producer implements Runnable {
    @Override
    public void run() {
    if (queue.size() == MAX_SIZE) {
    System.out.println("仓库已满,生产者" + Thread.currentThread().getName() + "不可生产.");
    }
    try {
    queue.put(1);
    System.out.println("生产者" + Thread.currentThread().getName() + "生产, 仓库容量为" + queue.size());

    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }

    class Consumer implements Runnable {


    @Override
    public void run() {
    if (queue.size() == 0) {
    System.out.println("仓库为空,消费者" + Thread.currentThread().getName() + "不可消费.");
    }
    try {
    queue.take();
    System.out.println("消费者" + Thread.currentThread().getName() + "消费,仓库容量为" + queue.size());

    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }

    public static void main(String[] args) {
    ProAndCon3 proAndCon = new ProAndCon3();
    Producer producer = proAndCon.new Producer();
    Consumer consumer = proAndCon.new Consumer();

    for (int i = 0; i < 10; i++) {
    Thread pro = new Thread(producer);
    pro.start();
    Thread con = new Thread(consumer);
    con.start();
    }
    }
    }
    运行结果就不贴了。
     

  • 相关阅读:
    C++中析构函数的作用,
    fp = fopen(s, "at") 中at 是啥意思,a 是append 追加的意思
    C++中 :: 的意思
    sed 指令
    make -e install ,,,make命令的-e选项!
    _AR="ar" _ARFLAGS="-ruv"
    gcc的-D和-U参数:宏的设置与取消 _CCFLAGS=" -w -enable-threads=posix -DLINUX -D_REENTRANT -DWORKONGN -Dlinux -D_GN_DETAIL_SDR_"
    GCC 编译详解
    RPC 编程 使用 RPC 编程是在客户机和服务器实体之间进行可靠通信的最强大、最高效的方法之一。它为在分布式计算环境中运行的几乎所有应用程序提供基础。
    vim插件ctags的安装和使用
  • 原文地址:https://www.cnblogs.com/awkflf11/p/12578569.html
Copyright © 2011-2022 走看看