zoukankan      html  css  js  c++  java
  • FluxInterval实例及解析

    本文主要研究下FluxInterval的机制

    FluxInterval

    reactor-core-3.1.3.RELEASE-sources.jar!/reactor/core/publisher/FluxInterval.java

    /**
     * Periodically emits an ever increasing long value either via a ScheduledExecutorService
     * or a custom async callback function
     * @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
     */
    final class FluxInterval extends Flux<Long> {
    
    	final Scheduler timedScheduler;
    	
    	final long initialDelay;
    	
    	final long period;
    	
    	final TimeUnit unit;
    
    	FluxInterval(
    			long initialDelay, 
    			long period, 
    			TimeUnit unit, 
    			Scheduler timedScheduler) {
    		if (period < 0L) {
    			throw new IllegalArgumentException("period >= 0 required but it was " + period);
    		}
    		this.initialDelay = initialDelay;
    		this.period = period;
    		this.unit = Objects.requireNonNull(unit, "unit");
    		this.timedScheduler = Objects.requireNonNull(timedScheduler, "timedScheduler");
    	}
    	
    	@Override
    	public void subscribe(CoreSubscriber<? super Long> actual) {
    		Worker w = timedScheduler.createWorker();
    
    		IntervalRunnable r = new IntervalRunnable(actual, w);
    
    		actual.onSubscribe(r);
    
    		try {
    			w.schedulePeriodically(r, initialDelay, period, unit);
    		}
    		catch (RejectedExecutionException ree) {
    			if (!r.cancelled) {
    				actual.onError(Operators.onRejectedExecution(ree, r, null, null,
    						actual.currentContext()));
    			}
    		}
    	}
    }	
    

    可以看到这里利用Scheduler来创建一个定时调度任务IntervalRunnable

    IntervalRunnable

    	static final class IntervalRunnable implements Runnable, Subscription,
    	                                               InnerProducer<Long> {
    		final CoreSubscriber<? super Long> actual;
    		
    		final Worker worker;
    		
    		volatile long requested;
    		static final AtomicLongFieldUpdater<IntervalRunnable> REQUESTED =
    				AtomicLongFieldUpdater.newUpdater(IntervalRunnable.class, "requested");
    		
    		long count;
    		
    		volatile boolean cancelled;
    
    		IntervalRunnable(CoreSubscriber<? super Long> actual, Worker worker) {
    			this.actual = actual;
    			this.worker = worker;
    		}
    
    		@Override
    		public CoreSubscriber<? super Long> actual() {
    			return actual;
    		}
    
    		@Override
    		@Nullable
    		public Object scanUnsafe(Attr key) {
    			if (key == Attr.CANCELLED) return cancelled;
    
    			return InnerProducer.super.scanUnsafe(key);
    		}
    
    		@Override
    		public void run() {
    			if (!cancelled) {
    				if (requested != 0L) {
    					actual.onNext(count++);
    					if (requested != Long.MAX_VALUE) {
    						REQUESTED.decrementAndGet(this);
    					}
    				} else {
    					cancel();
    					
    					actual.onError(Exceptions.failWithOverflow("Could not emit tick " + count + " due to lack of requests" +
    							" (interval doesn't support small downstream requests that replenish slower than the ticks)"));
    				}
    			}
    		}
    		
    		@Override
    		public void request(long n) {
    			if (Operators.validate(n)) {
    				Operators.addCap(REQUESTED, this, n);
    			}
    		}
    		
    		@Override
    		public void cancel() {
    			if (!cancelled) {
    				cancelled = true;
    				worker.dispose();
    			}
    		}
    	}
    

    这里重点看requested变量,run方法每次判断requested,如果requested为0则销毁worker,否则则每次发射一个元素计数就减一 而subscriber如果有继续request的话,则会增加requested的值

    实例1

        public static void main(String[] args) throws InterruptedException {
            Flux<Long> flux = Flux.interval(Duration.ofMillis(1))
                    .doOnNext(e -> {
                        System.out.println(e);
                    }).doOnError(e -> e.printStackTrace());
    
            System.out.println("begin to subscribe");
            flux.subscribe(e -> {
                System.out.println(e);
                try {
                    TimeUnit.MINUTES.sleep(30);
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            });
            TimeUnit.MINUTES.sleep(30);
        }
    

    这个例子requested是Long.MAX_VALUE,但是由于subscribe的线程跟运行interval的线程一样,由于里头执行了sleep操作也导致interval的调度也跟着阻塞住了。

    实例2

        public static void main(String[] args) throws InterruptedException {
            Flux<Long> flux = Flux.interval(Duration.ofMillis(1))
                    .doOnNext(e -> {
                        System.out.println(e);
                    })
                    //NOTE 这里request prefetch=256个
                    .publishOn(Schedulers.newElastic("publish-thread"))
                    .doOnError(e -> e.printStackTrace());
    
            System.out.println("begin to subscribe");
            AtomicInteger count = new AtomicInteger(0);
            //NOTE 得有subscribe才能触发request
            flux.subscribe(e -> {
                LOGGER.info("receive:{}",e);
                try {
                    //NOTE 使用publishOn将subscribe与interval的线程分开
                    if(count.get() == 0){
                        TimeUnit.MINUTES.sleep(2);
                    }
                    count.incrementAndGet();
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
            });
            TimeUnit.MINUTES.sleep(30);
        }
    

    使用publishOn将subscriber线程与interval线程隔离,使其sleep不阻塞interval 这里publishOn隐含了一个prefetch参数,默认是Queues.SMALL_BUFFER_SIZE即Math.max(16,Integer.parseInt(System.getProperty("reactor.bufferSize.small", "256")));

    	public final Flux<T> publishOn(Scheduler scheduler) {
    		return publishOn(scheduler, Queues.SMALL_BUFFER_SIZE);
    	}
    
    	final Flux<T> publishOn(Scheduler scheduler, boolean delayError, int prefetch, int lowTide) {
    		if (this instanceof Callable) {
    			if (this instanceof Fuseable.ScalarCallable) {
    				@SuppressWarnings("unchecked")
    				Fuseable.ScalarCallable<T> s = (Fuseable.ScalarCallable<T>) this;
    				try {
    					return onAssembly(new FluxSubscribeOnValue<>(s.call(), scheduler));
    				}
    				catch (Exception e) {
    					//leave FluxSubscribeOnCallable defer exception call
    				}
    			}
    			@SuppressWarnings("unchecked")
    			Callable<T> c = (Callable<T>)this;
    			return onAssembly(new FluxSubscribeOnCallable<>(c, scheduler));
    		}
    
    		return onAssembly(new FluxPublishOn<>(this, scheduler, delayError, prefetch, lowTide, Queues.get(prefetch)));
    	}
    

    这里使用Queues.get(prefetch)创建一个间接的队列来盛放元素

    这个实例最后输出

    //......
    21:06:03.108 [publish-thread-2] INFO com.example.demo.FluxTest - receive:254
    21:06:03.108 [publish-thread-2] INFO com.example.demo.FluxTest - receive:255
    reactor.core.Exceptions$OverflowException: Could not emit tick 256 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
    	at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
    	at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:121)
    	at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
    	at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    	at java.lang.Thread.run(Thread.java:745)
    

    由于第一次request默认是256,之后在发射256个元素之后,subscriber没有跟上,导致interval的worker被cancel掉了,于是后续消费完256个元素之后,紧挨着就是OverflowException这个异常

    小结

    reactor本身并不依赖线程,只有interval,delayElements等方法才会创建线程。而reactor本身是观察者设计模式的扩展,采用push+backpressure模式,一开始调用subscribe方法就触发request N请求推送数据,之后publisher就onNext推送数据,直到complete或cancel。实例1是因为线程阻塞导致interval的onNext阻塞,实例2是interval被cancel掉导致flux关闭。

    转载于:https://my.oschina.net/go4it/blog/1622063

  • 相关阅读:
    1.4.2.3. SETUP(Core Data 应用程序实践指南)
    1.4.2.2. PATHS(Core Data 应用程序实践指南)
    1.4.2.1. FILES(Core Data 应用程序实践指南)
    1.4.2. 实现 Core Data Helper 类(Core Data 应用程序实践指南)
    1.4.1. Core Data Helper 简介(Core Data 应用程序实践指南)
    1.4. 为现有的应用程序添加 Core Data 支持(Core Data 应用程序实践指南)
    1.3.2. App Icon 和 Launch Image(Core Data 应用程序实践指南)
    1.3.1. 新建Xcode项目并设置故事板(Core Data 应用程序实践指南)
    php验证邮箱是否合法
    如何使js函数异步执行
  • 原文地址:https://www.cnblogs.com/twodog/p/12137484.html
Copyright © 2011-2022 走看看