zoukankan      html  css  js  c++  java
  • Java高并发之设计模式

    本文主要讲解几种常见并行模式, 具体目录结构如下图.

     
     

    单例

    单例是最常见的一种设计模式, 一般用于全局对象管理, 比如xml配置读写之类的.

    一般分为懒汉式, 饿汉式.

    懒汉式: 方法上加synchronized

    public static synchronized Singleton getInstance() {

    if (single == null) {   

    single = new Singleton(); 

    }   

    return single; 

    }

    这种方式, 由于每次获取示例都要获取锁, 不推荐使用, 性能较差

    懒汉式: 使用双检锁 + volatile

    private volatile Singleton singleton = null;

    public static Singleton getInstance() {

    if (singleton == null) {

    synchronized (Singleton.class) {

    if (singleton == null) {

    singleton = new Singleton();

    }

    }

    }

    return singleton;

    }

    本方式是对直接在方法上加锁的一个优化, 好处在于只有第一次初始化获取了锁.

    后续调用getInstance已经是无锁状态. 只是写法上稍微繁琐点.

    至于为什么要volatile关键字, 主要涉及到jdk指令重排, 详见之前的CSDN博文: Java内存模型与指令重排

    懒汉式: 使用静态内部类

    public class Singleton {

    private static class LazyHolder {   

    private static final Singleton INSTANCE = new Singleton();   

    }   

    private Singleton (){}   

    public static final Singleton getInstance() {   

    return LazyHolder.INSTANCE;   

    }   

    }

    该方式既解决了同步问题, 也解决了写法繁琐问题. 推荐使用改写法.

    缺点在于无法响应事件来重新初始化INSTANCE.

    饿汉式

    public class Singleton1 {

    private Singleton1() {} 

    private static final Singleton1 single = new Singleton1(); 

    public static Singleton1 getInstance() { 

    return single; 

    }

    缺点在于对象在一开始就直接初始化了.

    Future模式

    该模式的核心思想是异步调用. 有点类似于异步的ajax请求.

    当调用某个方法时, 可能该方法耗时较久, 而在主函数中也不急于立刻获取结果.

    因此可以让调用者立刻返回一个凭证, 该方法放到另外线程执行,

    后续主函数拿凭证再去获取方法的执行结果即可, 其结构图如下

     
     

    jdk中内置了Future模式的支持, 其接口如下:

     
     

    通过FutureTask实现

    注意其中两个耗时操作.

    如果doOtherThing耗时2s, 则整个函数耗时2s左右.

    如果doOtherThing耗时0.2s, 则整个函数耗时取决于RealData.costTime, 即1s左右结束.

    public class FutureDemo1 {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

    FutureTask future = new FutureTask(new Callable() {

    @Override

    public String call() throws Exception {

    return new RealData().costTime();

    }

    });

    ExecutorService service = Executors.newCachedThreadPool();

    service.submit(future);

    System.out.println("RealData方法调用完毕");

    // 模拟主函数中其他耗时操作

    doOtherThing();

    // 获取RealData方法的结果

    System.out.println(future.get());

    }

    private static void doOtherThing() throws InterruptedException {

    Thread.sleep(2000L);

    }

    }

    class RealData {

    public String costTime() {

    try {

    // 模拟RealData耗时操作

    Thread.sleep(1000L);

    return "result";

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    return "exception";

    }

    }

    通过Future实现

    与上述FutureTask不同的是, RealData需要实现Callable接口.

    public class FutureDemo2 {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

    ExecutorService service = Executors.newCachedThreadPool();

    Future future = service.submit(new RealData2());

    System.out.println("RealData2方法调用完毕");

    // 模拟主函数中其他耗时操作

    doOtherThing();

    // 获取RealData2方法的结果

    System.out.println(future.get());

    }

    private static void doOtherThing() throws InterruptedException {

    Thread.sleep(2000L);

    }

    }

    class RealData2 implements Callable{

    public String costTime() {

    try {

    // 模拟RealData耗时操作

    Thread.sleep(1000L);

    return "result";

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    return "exception";

    }

    @Override

    public String call() throws Exception {

    return costTime();

    }

    }

    另外Future本身还提供了一些额外的简单控制功能, 其API如下

    // 取消任务

    boolean cancel(boolean mayInterruptIfRunning);

    // 是否已经取消

    boolean isCancelled();

    // 是否已经完成

    boolean isDone();

    // 取得返回对象

    V get() throws InterruptedException, ExecutionException;

    // 取得返回对象, 并可以设置超时时间

    V get(long timeout, TimeUnit unit)

    throws InterruptedException, ExecutionException, TimeoutException;

    生产消费者模式

    生产者-消费者模式是一个经典的多线程设计模式. 它为多线程间的协作提供了良好的解决方案。

    在生产者-消费者模式中,通常由两类线程,即若干个生产者线程和若干个消费者线程。

    生产者线程负责提交用户请求,消费者线程则负责具体处理生产者提交的任务。

    生产者和消费者之间则通过共享内存缓冲区进行通信, 其结构图如下

     
     

    PCData为我们需要处理的元数据模型, 生产者构建PCData, 并放入缓冲队列.

    消费者从缓冲队列中获取数据, 并执行计算.

    生产者核心代码

    while(isRunning) {

    Thread.sleep(r.nextInt(SLEEP_TIME));

    data = new PCData(count.incrementAndGet);

    // 构造任务数据

    System.out.println(data + " is put into queue");

    if (!queue.offer(data, 2, TimeUnit.SECONDS)) {

    // 将数据放入队列缓冲区中

    System.out.println("faild to put data : " + data);

    }

    }

    消费者核心代码

    while (true) {

    PCData data = queue.take();

    // 提取任务

    if (data != null) {

    // 获取数据, 执行计算操作

    int re = data.getData() * 10;

    System.out.println("after cal, value is : " + re);

    Thread.sleep(r.nextInt(SLEEP_TIME));

    }

    }

    生产消费者模式可以有效对数据解耦, 优化系统结构.

    降低生产者和消费者线程相互之间的依赖与性能要求.

    一般使用BlockingQueue作为数据缓冲队列, 他是通过锁和阻塞来实现数据之间的同步,

    如果对缓冲队列有性能要求, 则可以使用基于CAS无锁设计的ConcurrentLinkedQueue.

    分而治之

    严格来讲, 分而治之不算一种模式, 而是一种思想.

    它可以将一个大任务拆解为若干个小任务并行执行, 提高系统吞吐量.

    我们主要讲两个场景, Master-Worker模式, ForkJoin线程池.

    Master-Worker模式

    该模式核心思想是系统由两类进行协助工作: Master进程, Worker进程.

    Master负责接收与���配任务, Worker负责处理任务. 当各个Worker处理完成后,

    将结果返回给Master进行归纳与总结.

     
     

    假设一个场景, 需要计算100个任务, 并对结果求和, Master持有10个子进程.

    Master代码

    public class MasterDemo {

    // 盛装任务的集合

    private ConcurrentLinkedQueue workQueue = new ConcurrentLinkedQueue();

    // 所有worker

    private HashMap workers = new HashMap<>();

    // 每一个worker并行执行任务的结果

    private ConcurrentHashMap resultMap = new ConcurrentHashMap<>();

    public MasterDemo(WorkerDemo worker, int workerCount) {

    // 每个worker对象都需要持有queue的引用, 用于领任务与提交结果

    worker.setResultMap(resultMap);

    worker.setWorkQueue(workQueue);

    for (int i = 0; i < workerCount; i++) {

    workers.put("子节点: " + i, new Thread(worker));

    }

    }

    // 提交任务

    public void submit(TaskDemo task) {

    workQueue.add(task);

    }

    // 启动所有的子任务

    public void execute(){

    for (Map.Entry entry : workers.entrySet()) {

    entry.getValue().start();

    }

    }

    // 判断所有的任务是否执行结束

    public boolean isComplete() {

    for (Map.Entry entry : workers.entrySet()) {

    if (entry.getValue().getState() != Thread.State.TERMINATED) {

    return false;

    }

    }

    return true;

    }

    // 获取最终汇总的结果

    public int getResult() {

    int result = 0;

    for (Map.Entry entry : resultMap.entrySet()) {

    result += Integer.parseInt(entry.getValue().toString());

    }

    return result;

    }

    }

    Worker代码

    public class WorkerDemo implements Runnable{

    private ConcurrentLinkedQueue workQueue;

    private ConcurrentHashMap resultMap;

    @Override

    public void run() {

    while (true) {

    TaskDemo input = this.workQueue.poll();

    // 所有任务已经执行完毕

    if (input == null) {

    break;

    }

    // 模拟对task进行处理, 返回结果

    int result = input.getPrice();

    this.resultMap.put(input.getId() + "", result);

    System.out.println("任务执行完毕, 当前线程: " + Thread.currentThread().getName());

    }

    }

    public ConcurrentLinkedQueue getWorkQueue() {

    return workQueue;

    }

    public void setWorkQueue(ConcurrentLinkedQueue workQueue) {

    this.workQueue = workQueue;

    }

    public ConcurrentHashMap getResultMap() {

    return resultMap;

    }

    public void setResultMap(ConcurrentHashMap resultMap) {

    this.resultMap = resultMap;

    }

    }

    public class TaskDemo {

    private int id;

    private String name;

    private int price;

    public int getId() {

    return id;

    }

    public void setId(int id) {

    this.id = id;

    }

    public String getName() {

    return name;

    }

    public void setName(String name) {

    this.name = name;

    }

    public int getPrice() {

    return price;

    }

    public void setPrice(int price) {

    this.price = price;

    }

    }

    主函数测试

    MasterDemo master = new MasterDemo(new WorkerDemo(), 10);

    for (int i = 0; i < 100; i++) {

    TaskDemo task = new TaskDemo();

    task.setId(i);

    task.setName("任务" + i);

    task.setPrice(new Random().nextInt(10000));

    master.submit(task);

    }

    master.execute();

    while (true) {

    if (master.isComplete()) {

    System.out.println("执行的结果为: " + master.getResult());

    break;

    }

    }

    ForkJoin线程池

    该线程池是jdk7之后引入的一个并行执行任务的框架, 其核心思想也是将任务分割为子任务,

    有可能子任务还是很大, 还需要进一步拆解, 最终得到足够小的任务.

    将分割出来的子任务放入双端队列中, 然后几个启动线程从双端队列中获取任务执行.

    子任务执行的结果放到一个队列里, 另起线程从队列中获取数据, 合并结果.

     
     

    假设我们的场景需要计算从0到20000000L的累加求和. CountTask继承自RecursiveTask, 可以携带返回值.

    每次分解大任务, 简单的将任务划分为100个等规模的小任务, 并使用fork()提交子任务.

    在子任务中通过THRESHOLD设置子任务分解的阈值, 如果当前需要求和的总数大于THRESHOLD, 则子任务需要再次分解,

    如果子任务可以直接执行, 则进行求和操作, 返回结果. 最终等待所有的子任务执行完毕, 对所有结果求和.

    public class CountTask extends RecursiveTask{

    // 任务分解的阈值

    private static final int THRESHOLD = 10000;

    private long start;

    private long end;

    public CountTask(long start, long end) {

    this.start = start;

    this.end = end;

    }

    public Long compute() {

    long sum = 0;

    boolean canCompute = (end - start) < THRESHOLD;

    if (canCompute) {

    for (long i = start; i <= end; i++) {

    sum += i;

    }

    } else {

    // 分成100个小任务

    long step = (start + end) / 100;

    ArrayList subTasks = new ArrayList();

    long pos = start;

    for (int i = 0; i < 100; i++) {

    long lastOne = pos + step;

    if (lastOne > end) {

    lastOne = end;

    }

    CountTask subTask = new CountTask(pos, lastOne);

    pos += step + 1;

    // 将子任务推向线程池

    subTasks.add(subTask);

    subTask.fork();

    }

    for (CountTask task : subTasks) {

    // 对结果进行join

    sum += task.join();

    }

    }

    return sum;

    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {

    ForkJoinPool pool = new ForkJoinPool();

    // 累加求和 0 -> 20000000L

    CountTask task = new CountTask(0, 20000000L);

    ForkJoinTask result = pool.submit(task);

    System.out.println("sum result : " + result.get());

    }

    }

    ForkJoin线程池使用一个无锁的栈来管理空闲线程, 如果一个工作线程暂时取不到可用的任务, 则可能被挂起.

    挂起的线程将被压入由线程池维护的栈中, 待将来有任务可用时, 再从栈中唤醒这些线程.



    作者:欧阳海阳
    链接:https://www.jianshu.com/p/b5e6100a4051
    來源:简书
    简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。
  • 相关阅读:
    mongodb导入导出
    python笔记1
    C# 文件下载断点续传
    热水维修记事
    memcached笔记
    模拟登陆
    Nginx学习笔记之加强篇
    Redis学习笔记之基础篇
    Nginx学习笔记之应用篇
    Nginx 学习笔记之安装篇
  • 原文地址:https://www.cnblogs.com/DreamRecorder/p/9244196.html
Copyright © 2011-2022 走看看