zoukankan      html  css  js  c++  java
  • disruptor 高并发编程 简介demo

    原文地址:http://www.cnblogs.com/qiaoyihang/p/6479994.html  

      disruptor适用于大规模低延迟的并发场景。可用于读写操作分离、数据缓存,速度匹配(因为其实现了生产者-消费者模型)、或者是基于内存的事件流处理机制的场景。

      disruptor的主要设计思想是无锁的高并发,特别适用于对时间高度敏感的多线程应用。如果app对时间不敏感完全可以不用disruptor 而只用array blocking queue,在设计上采用内存屏障的机制和CAS操作实现此思想。主流的并发程序 
      都离不开锁对资源的管控,或者尽量避开锁的使用。 
      其主要的实现原理总结有如下三点,当然还有很多地方设计得很巧妙,需要细细阅读源码和官方文档。虽然这个 过程对我来说很尴尬,但痛并快乐者,有朝闻道、夕可死也的感觉。 
      1.采用消费者-生产者模型进行读写的分离。 
      2.用循环缓存(实际是一个循环队列)实现了数据的暂存和读写速度的匹配。 
      3.用内存屏障加序列号的方式实现了无锁的并发机制。 

    在插一下cas机制

    CAS,又称Compare-and-Swap,代表一种原子操作

    一, 为每一个Node在Set的时候分配一个cas值,(本质是版本号,返回的Node和存储Node的cas值一样,每次要更新这个Node时要检查cas的值是否与取出来时一致)

    二, 只有在Update一个key的value时才会造成多线程冲突,只是Set/Get是不会的,单线程也不会并发问题。

    三, 如何维护每个线程/进程的cas的值:

            增加步进的概念:cas每次自增每个线程都不一样,这样的话,每个线程有一个确定的变量,如果是由其它线程修改的一定与本线程的cas不一样

            1, 每个线程/进程有一个初始化的index,如果有10个进程就是编号为0 ~9

            2, 每次cas值增加都是按进程数来加,step[0] += 10,这样能保证每个进程的cas都不会一样

            3,缺点是需要额外的初始化

    cas 使用场景:

    线程T1对key1、线程T2对Key1并发Get更新了Value值后想Set回去,可能会出现后一个操作覆盖前一个操作值,而且这个值是涉及到事务性的。正确是应该是T1 Set完后,T2才能取,串行化操作。

    CAS就是解决这个问题,如果发现cas值不一样了,就会Set失败,需要重取再设置,假定某时刻T1 的cas值为20,T2 的cas值为21。如果没有步进时,T1处理后cas值为21,T2再处理就认为没有改变过。

     

    disruptor的主要编程部件 
       1.Disruptor:用于控制整个消费者-生产者模型的处理器 
       2.RingBuffer:用于存放数据 
       3.EventHandler:一个用于处理事件的接口(可以当做生产者,也可以当做消费者)。 
       4.EventFactory:事件工厂类。 
       5.WaitStrategy:用于实现事件处理等待RingBuffer游标策略的接口。 
       6.SequeueBarrier:队列屏障,用于处理访问RingBuffer的序列。 
       7.用于运行disruptor的线程或者线程池。

     -disruptor编程主要的编程流程 
       1.定义事件 
       2.定义事件工厂 
       3.定义事件处理类 
       4.定义事件处理的线程或者线程池 
       5.指定等待策略 
       6.通过disruptor处理器组装生产者和消费者 
       7.发布事件 
       8.关闭disruptor业务逻辑处理器 

    disruptor实现无锁高并发,主要采用的消费者-生产者模型。所以编程的实践场景如下 
       1.一个生产者—一个消费者的场景 
       2.一个生产者—多个消费者的场景 
       3.多个生产者—一个消费者的场景 
       4.多个生产者—多个消费者的场景 

    记录一下自己写的demo,模拟三个消费者消费一个生产者的数据,最后等待所有线程都执行完毕才进行下一步操作:

    //首先定义一个事件
    public
    class MyEvent { private String name; private CountDownLatch countDownLatch; public CountDownLatch getCountDownLatch() { return countDownLatch; } public String getName() { return name; } public void setName(String name) { this.name = name; } public void setCountDownLatch(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } public void setMyEvent(MyEvent myEvent){ name = myEvent.name; countDownLatch = myEvent.countDownLatch; } }
    //生产数据的工厂
    public
    class MyEventFactory implements EventFactory<MyEvent> { @Override public MyEvent newInstance() { // TODO Auto-generated method stub return new MyEvent(); } }
    //数据构造
    public
    class MyEventProduce implements Runnable { private final int SIZE = 3; private CountDownLatch countDownLatch; private Disruptor<MyEvent> disruptor; public MyEventProduce() { countDownLatch = new CountDownLatch(SIZE); } public MyEventProduce setDisruptor(Disruptor<MyEvent> disruptor) { this.disruptor = disruptor; return this; } public CountDownLatch getCountDownLatch() { return countDownLatch; } @Override public void run() { for (int i = 1; i <= SIZE; i++) { MyEvent event = new MyEvent(); event.setName("name--" + i); event.setCountDownLatch(countDownLatch); disruptor.publishEvent(new MyEventTranslator(event)); } } }
    public class MyEventTranslator implements EventTranslator<MyEvent> {
        private MyEvent myEvent;
    
        public MyEventTranslator(MyEvent myEvent) {
            this.myEvent = myEvent;
        }
    
        @Override
        public void translateTo(MyEvent event, long sequence) {
            event.setMyEvent(myEvent);
        }
    
    }
    //第一个消费者
    public
    class Handler1 implements EventHandler<MyEvent>, WorkHandler<MyEvent> { private static final Logger log = LoggerFactory.getLogger(Handler1.class); @Override public void onEvent(MyEvent event) throws Exception { log.debug(event.getName() + "====Handler1 。。。。"); // throw new RuntimeException("测试异常"); } @Override public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exception { System.out.println("not go"); onEvent(event); } }
    //第二个消费者
    public
    class Handler11 implements EventHandler<MyEvent>, WorkHandler<MyEvent> { private static final Logger log = LoggerFactory.getLogger(Handler11.class); @Override public void onEvent(MyEvent event) throws Exception { log.debug(event.getName() + "====Handler11 。。。。。"); } @Override public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exception { onEvent(event); } }
    //第三个消费者
    public
    class Handler2 implements EventHandler<MyEvent>, WorkHandler<MyEvent> { private static final Logger log = LoggerFactory.getLogger(Handler2.class); @Override public void onEvent(MyEvent event) throws Exception { log.debug(event.getName() + "====Handler2........"); event.getCountDownLatch().countDown(); } @Override public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exception { onEvent(event); } }
    //异常处理事件类
    public
    class MyHandlerException implements ExceptionHandler { /* * (non-Javadoc) * * @see * com.lmax.disruptor.ExceptionHandler#handleEventException(java.lang.Throwable * , long, java.lang.Object) */ @Override public void handleEventException(Throwable ex, long sequence, Object event) { // TODO Auto-generated method stub System.out.println("MyHandlerException handleEventException..."); } /* * (non-Javadoc) * * @see * com.lmax.disruptor.ExceptionHandler#handleOnStartException(java.lang. * Throwable) */ @Override public void handleOnStartException(Throwable ex) { System.out.println("MyHandlerException handleOnStartException..."); } /* * (non-Javadoc) * * @see * com.lmax.disruptor.ExceptionHandler#handleOnShutdownException(java.lang * .Throwable) */ @Override public void handleOnShutdownException(Throwable ex) { System.out.println("MyHandlerException handleOnShutdownException..."); } }
    //单元测试类
    public
    class TestDisruptor { private static final Logger log = LoggerFactory .getLogger(TestDisruptor.class); @Test public void myTest() throws Exception { Disruptor<MyEvent> disruptor = new Disruptor<>(new MyEventFactory(), 1024, Exector.newInstance().getExecutorService(), ProducerType.SINGLE, new YieldingWaitStrategy()); disruptor.handleExceptionsWith(new MyHandlerException()); disruptor.handleEventsWithWorkerPool(new Handler1()) .thenHandleEventsWithWorkerPool(new Handler11()) .thenHandleEventsWithWorkerPool(new Handler2()); disruptor.start(); MyEventProduce ep = new MyEventProduce().setDisruptor(disruptor); CountDownLatch countDownLatch = ep.getCountDownLatch(); Exector.newInstance().getExecutorService().submit(ep); countDownLatch.await(); disruptor.shutdown(); log.debug("运行完毕"); } public void testJDBC() throws Exception { Connection connection = DriverManager.getConnection("", "", ""); } }

    文章借鉴:http://blog.csdn.net/jeffsmish/article/details/53572043

    推荐文章:http://blog.163.com/zongyuan1987@126/blog/static/131623156201271021955717/

  • 相关阅读:
    cancel_delayed_work和flush_scheduled_work【转】
    linux中断申请之request_threaded_irq【转】
    VELT-0.1.5开发:使用kgdb调试Linux内核【转】
    iOS_第3方类库_BlurAlertView_GPUImage
    一个轻client,多语言支持,去中心化,自己主动负载,可扩展的实时数据写服务的实现方案讨论
    如果数据文件顺序被打乱,你有办法找回正确的文件把数据库打开吗?
    Hibernate级联操作和载入机制(二) cascade and fetch
    UVA 10557 XYZZY
    Arcgis for javascript map操作addLayer具体解释
    64位CentOS上编译 Hadoop 2.2.0
  • 原文地址:https://www.cnblogs.com/qiaoyihang/p/6479994.html
Copyright © 2011-2022 走看看