ObservableRunnable
package com.dwz.concurrency2.chapter5; public abstract class ObservableRunnable implements Runnable { protected final LifeCycleListener listener; public ObservableRunnable(final LifeCycleListener listener) { this.listener = listener; } protected void notifyChange(final RunnableEvent event) { listener.onEvent(event); } public enum RunnableState { RUNNING, ERROR, DONE; } public static class RunnableEvent { private final RunnableState state; private final Thread thread; private final Throwable cause; public RunnableEvent(RunnableState state, Thread thread, Throwable cause) { super(); this.state = state; this.thread = thread; this.cause = cause; } public RunnableState getState() { return state; } public Thread getThread() { return thread; } public Throwable getCause() { return cause; } } }
LifeCycleListener
package com.dwz.concurrency2.chapter5; import com.dwz.concurrency2.chapter5.ObservableRunnable.RunnableEvent; public interface LifeCycleListener { public void onEvent(RunnableEvent event); }
ThreadLifeCycleObserver
package com.dwz.concurrency2.chapter5; import java.util.List; import com.dwz.concurrency2.chapter5.ObservableRunnable.RunnableEvent; public class ThreadLifeCycleObserver implements LifeCycleListener { private final Object LOCK = new Object(); public void concurrentQuery(List<String> ids) { if(ids == null || ids.isEmpty()) { return; } ids.stream().forEach(id -> new Thread(new ObservableRunnable(this) { @Override public void run() { try { notifyChange(new RunnableEvent(RunnableState.RUNNING, Thread.currentThread(), null)); System.out.println("query for the id " + id); Thread.sleep(1000L); int x = 1/0; notifyChange(new RunnableEvent(RunnableState.DONE, Thread.currentThread(), null)); } catch(Exception e) { notifyChange(new RunnableEvent(RunnableState.ERROR, Thread.currentThread(), e)); } } }, id).start()); } @Override public void onEvent(RunnableEvent event) { synchronized (LOCK) { System.out.println("The runnable [" + event.getThread().getName() + "] data changed and state is [" + event.getState() + "]"); } if(event.getCause() != null) { System.out.println("The runnable [" + event.getThread().getName() + "] process failed."); event.getCause().printStackTrace(); } } }
测试代码:
package com.dwz.concurrency2.chapter5; import java.util.Arrays; public class ThreadLifeCycleClient { public static void main(String[] args) { new ThreadLifeCycleObserver().concurrentQuery(Arrays.asList("1", "2")); } }