zoukankan      html  css  js  c++  java
  • 读Cassandra源码之并发

    java 并发与线程池

    java并发包使用Executor框架来进行线程的管理,Executor将任务的提交与执行过程分开,直接使用Runnable表示任务。future获取返回值。ExecutorService 继承了Executor接口,提供生命周器的管理,包括运行,关闭,终止三种状态。

    ThreadPoolExecutor
    ThreadPoolExecutor 是ExecutorService的一个实现类。使用几个线程池来执行task,通常使用Executors工厂方法配置。
    ThreadPoolExecutor 允许提供一个BlockingQueue来保存正在等待执行的task,队列一般有三种:无界,有界,和同步移交(synchronous handoff)。

    newFixedThreadPool和newSingleThreadExecutor默认情况使用LinkedBlockingQueue。当任务增加的速度超过线程处理任务的速度时,队列大小会无限增加。会造成资源耗尽,内存溢出等问题。

    所以使用有界队列比较稳妥,但是引入了新的问题,队列满了后,新的任务如何处理。这种情况引入了饱和策略,JDK提供了几种不同的饱和策略。

    • Abort(中止) 会扔出一个RejectedExecution Exception,开发者根据此处理自己的业务代码

    • CallerRunsPolicy 不会抛弃任务,也不会抛出异常。而是将某些task回退给调用者,降低新任务的流量。

    无界队列:Executor提供的newFixedThreadPool和newSingleThreadExecutor在默认情况下将使用一个无界的LinkedBlockingQueue。无界队列当负载很大时,可能会导致资源耗尽

    有界队列:ArrayBlockingQueue,
    队列填满以后如何处理请求:
    需要使用饱和策略:
    1.就是reject,抛异常,开发者自己处理异常,决定策略。
    2.丢给主线程,主线程去处理任务

    在使用有界的工作队列时,队列的大小与线程池的大小必须一起调节,

    同步移交:
    对于非常大的或者无界的线程池,可以通过使用SynchronousQueue来避免排队,将任务从生产者直接移交给工作中线程。不放在工作队列里了
    Executors
    同时Executor也提供了线程池管理方法。可以调用Executors的静态工厂方法来创建一个线程池

    • newFixedThreadPool 固定大小的线程池,没达到最大线程数目时,提交一个任务创建一个线程,达到最大数目后,不再变化。

      public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
          return new ThreadPoolExecutor(nThreads, nThreads,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>(),
                                        threadFactory);
      }
      
    • newCachedThreadPool 可缓存的线程池,没有线程最大数目限制。如果线程池当前规模超过了请求,就回收空闲线程,请求任务增加时,就添加新的线程。

    • newSingleThreadExecutor 单线程的Executor,如果线程异常结束,会创建另外一个线程来替代。确保任务在按队列中的顺序来串行执行。

    • newScheduledThreadPool 固定长度的线程池,而且以延迟或定时的方式来执行任务

    一个sample code

    ExecutorService executor = Executors.newSingleThreadExecutor();
            Callable<List<String>> callable;
            callable = new Callable<List<String>>(){
    
                @Override
                public List<String> call() throws Exception {
                    return readFile("src/concurrent/test.txt");
                }
    
            };
            Future<List<String>> future = executor.submit(callable);
            try {
                List<String> lines = future.get(5, TimeUnit.SECONDS);
                for(String line: lines) {
                    System.out.println(line);
                }
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

    完整源码stoneFang的github

    Google Guava的并发库

    https://github.com/google/guava/wiki/ListenableFutureExplained

    JDK中Future通过异步的方式计算返回结果,当并发操作时,在任务结束或者没结束的时候都会返回一个结果。Future是异步操作的一个引用句柄,确保在服务执行返回一个结果。

    ListenableFuture允许注册回调方法。可以一个小小的改进会支持更多的操作。
    对应JDK中的 ExecutorService.submit(Callable) 提交多线程异步运算的方式,Guava 提供了ListeningExecutorService 接口, 该接口返回 ListenableFuture 而相应的 ExecutorService 返回普通的 Future。将 ExecutorService 转为 ListeningExecutorService,可以使用MoreExecutors.listeningDecorator(ExecutorService)进行装饰。

    ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
    ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
      public Explosion call() {
        return pushBigRedButton();
      }
    });
    Futures.addCallback(explosion, new FutureCallback<Explosion>() {
      // we want this handler to run immediately after we push the big red button!
      public void onSuccess(Explosion explosion) {
        walkAwayFrom(explosion);
      }
      public void onFailure(Throwable thrown) {
        battleArchNemesis(); // escaped the explosion!
      }
    });
    

    与JDK的并发处理写了个对比的guava并发处理

    ```
    
    ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
        Callable<List<String>> callable;
        callable = new Callable<List<String>>(){
    
            @Override
            public List<String> call() throws Exception {
                return readFile("src/concurrent/test.txt");
            }
    
        };
        ListenableFuture<List<String>> future = executor.submit(callable);
        Futures.addCallback(future, new FutureCallback<List<String>>() {
    
            public void onFailure(Throwable thrown) {
                System.out.println("error");
            }
    
            @Override
            public void onSuccess(List<String> result) {
                for(String line: result) {
                    System.out.println(line);
                }
    
            }
        });
    ```
    

    源码在My Github

    Cassandra的并发

    cassandra在jdk的concurrent包上封装了自己的并发处理,同时也在各处调用原生的jdk并发包以及google的guava并发处理包

    Cassandra并发框架

    Figure1——cassandra并发实现
    cassandra concurrent实现

    1. cassandra各个Stage是通过StageManger来进行管理的,StageManager 有个内部类ExecuteOnlyExecutor。

    2. ExecuteOnlyExecutor继承了ThreadPoolExecutor,实现了cassandra的LocalAwareExecutorSerivce接口

    3. LocalAwareExecutorService继承了Java的ExecutorService,构建了基本的任务模型。添加了两个自己的方法.
      execute方法用于trace跟踪。

      public void execute(Runnable command, ExecutorLocals locals);
      public void maybeExecuteImmediately(Runnable command);
      

      对于Executor中的默认execute方法,和LocalAwareExecutorSerive中的execute方法都是new 一个task,然后将task添加到queue中。而maybeExecuteImmedicatly方法则是判断下是否有正在执行的task或者work,如果没有则直接执行,而不添加到队列中。

      public void maybeExecuteImmediately(Runnable command)
      {
          //comment1
          FutureTask<?> ft = newTaskFor(command, null);
          if (!takeWorkPermit(false))
          {
              addTask(ft);
          }
          else
          {
              try
              {
                  ft.run();
              }
              finally
              {
                  returnWorkPermit();
                  maybeSchedule();
              }
          }
      }
      
    4. AbstractLocalAwareExecutorService实现LocalAwareExecutorSerive接口,提供了executor的实现以及ExecutorServie接口中的关于生命周期管理的方法实现,如submit,shoudown等方法。添加了addTask,和任务完成的方法onCompletion。

    5. SEPExecutor实现了LocalAwareExecutorService类,提供了addTask,onCompletion,maybeExecuteImmediately等方法的实现。同时负责队列的管理

    6. SharedExecutorPool,线程池管理,用来管理Executor

    Cassandra并发例子FlushWriter

    org.apache.cassandra.tools.nodetool.Flush
    org.apache.cassandra.service.StorageService.forceKeyspaceFlush
    org.apache.cassandra.db.ColumnFamily.forceBlockingFlush
    org.apache.cassandra.db.ColumnFamily.forceFlush
    public ListenableFuture<?> forceFlush(ReplayPosition flushIfDirtyBefore)
    {
        //1.需要处理的memtable data
        synchronized (data)
        {
    
            // memtable 的flush过程需要同时flush secondary index
            // during index build, 2ary index memtables can be dirty even if parent is not.  if so,
            // we want to flush the 2ary index ones too.
            boolean clean = true;
            for (ColumnFamilyStore cfs : concatWithIndexes())
                clean &= cfs.data.getView().getCurrentMemtable().isCleanAfter(flushIfDirtyBefore);
    
            if (clean)
            {
                // We could have a memtable for this column family that is being
                // flushed. Make sure the future returned wait for that so callers can
                // assume that any data inserted prior to the call are fully flushed
                // when the future returns (see #5241).
                ListenableFutureTask<?> task = ListenableFutureTask.create(new Runnable()
                {
                    public void run()
                    {
                        logger.trace("forceFlush requested but everything is clean in {}", name);
                    }
                }, null);
                //执行flush的线程
                postFlushExecutor.execute(task);
                return task;
            }
    
            return switchMemtable();
        }
    }
    

    data
    就是Memtables,以及在磁盘上的SSTables。需要使用synchronize来确保隔离性。在CF类初始化的时候会进行加载

    public ColumnFamilyStore(Keyspace keyspace,
    String columnFamilyName,
    int generation,
    CFMetaData metadata,
    Directories directories,
    boolean loadSSTables,
    boolean registerBookkeeping)
    {

        data = new Tracker(this, loadSSTables);
    
        if (data.loadsstables)
        {
            Directories.SSTableLister sstableFiles = directories.sstableLister(Directories.OnTxnErr.IGNORE).skipTemporary(true);
            Collection<SSTableReader> sstables = SSTableReader.openAll(sstableFiles.list().entrySet(), metadata);
            data.addInitialSSTables(sstables);
        } 
    }
    

    postFlushExecutor.execute(task);调用的就是ThreadPoolExecutor

     private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(1,
                                                                                              StageManager.KEEPALIVE,
                                                                                              TimeUnit.SECONDS,
                                                                                              new LinkedBlockingQueue<Runnable>(),
                                                                                              new NamedThreadFactory("MemtableFlushWriter"),
                                                                                              "internal");

    参考

    stoneFang的github

    https://wizardforcel.gitbooks.io/guava-tutorial/content/16.html

  • 相关阅读:
    洛谷P2505||bzoj2750 [HAOI2012]道路 && zkw线段树
    洛谷 P3462 [POI2007]ODW-Weights
    Xor-MST Codeforces
    101 to 010 Atcoder CODE FESTIVAL 2017 qual B D
    bzoj2125 最短路
    洛谷P1823 [COI2007] Patrik 音乐会的等待
    洛谷 P1121 环状最大两段子段和
    noip2017 逛公园
    洛谷 P1578 奶牛浴场
    洛谷 P1169||bzoj1057 [ZJOI2007]棋盘制作
  • 原文地址:https://www.cnblogs.com/stoneFang/p/6715269.html
Copyright © 2011-2022 走看看