EventDispatcher.fireEvent(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime()));
跟着代码一步步往下看:
public class EventDispatcher { /** * add event listener */ static public void addEventListener(AbstractEventListener listener) { for (Class<? extends Event> type : listener.interest()) { getEntry(type).listeners.addIfAbsent(listener); } } /** * fire event, notify listeners. */ static public void fireEvent(Event event) { if (null == event) { throw new IllegalArgumentException(); } for (AbstractEventListener listener : getEntry(event.getClass()).listeners) { try { listener.onEvent(event); } catch (Exception e) { log.error(e.toString(), e); } } }
这个getEntry(event.getClass()).listeners需要重点看下
/** * get event listener for eventType. Add Entry if not exist. */ static Entry getEntry(Class<? extends Event> eventType) { for (; ; ) { for (Entry entry : LISTENER_HUB) { if (entry.eventType == eventType) { return entry; } } Entry tmp = new Entry(eventType); /** * false means already exists */ if (LISTENER_HUB.addIfAbsent(tmp)) { return tmp; } } }
static final CopyOnWriteArrayList<Entry> LISTENER_HUB = new CopyOnWriteArrayList<Entry>();
这个LISTENER_HUB 会预先把几个相应的Entry 加载进去(这个我们后面分析),然后就开始onEvent,通知Listeners了;
@Service public class AsyncNotifyService extends AbstractEventListener { @Override public List<Class<? extends Event>> interest() { List<Class<? extends Event>> types = new ArrayList<Class<? extends Event>>(); // 触发配置变更同步通知 types.add(ConfigDataChangeEvent.class); return types; } @Override public void onEvent(Event event) { // 并发产生 ConfigDataChangeEvent if (event instanceof ConfigDataChangeEvent) { ConfigDataChangeEvent evt = (ConfigDataChangeEvent)event; long dumpTs = evt.lastModifiedTs; String dataId = evt.dataId; String group = evt.group; String tenant = evt.tenant; String tag = evt.tag;
// listen的服务地址 List<?> ipList = serverListService.getServerList(); // 其实这里任何类型队列都可以 Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>(); for (int i = 0; i < ipList.size(); i++) { queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, (String)ipList.get(i), evt.isBeta)); } EXCUTOR.execute(new AsyncTask(httpclient, queue)); } }
把这个配置关联的服务一个个通知下,建议大家可以结合观察者模式来理解这个;
private static final Executor EXCUTOR = Executors.newScheduledThreadPool(100, new NotifyThreadFactory());
然后把这个通知服务扔到县线程池,放到另外一个线程中去执行通知任务;
class AsyncTask implements Runnable { public AsyncTask(CloseableHttpAsyncClient httpclient, Queue<NotifySingleTask> queue) { this.httpclient = httpclient; this.queue = queue; } @Override public void run() { executeAsyncInvoke(); } private void executeAsyncInvoke() {
while (!queue.isEmpty()) { NotifySingleTask task = queue.poll(); String targetIp = task.getTargetIP(); if (serverListService.getServerList().contains( targetIp)) { // 启动健康检查且有不监控的ip则直接把放到通知队列,否则通知 if (serverListService.isHealthCheck() && ServerListService.getServerListUnhealth().contains(targetIp)) { // target ip 不健康,则放入通知列表中 ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(), LOCAL_IP, ConfigTraceService.NOTIFY_EVENT_UNHEALTH, 0, task.target); // get delay time and set fail count to the task int delay = getDelayTime(task); Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>(); queue.add(task); AsyncTask asyncTask = new AsyncTask(httpclient, queue); ((ScheduledThreadPoolExecutor)EXCUTOR).schedule(asyncTask, delay, TimeUnit.MILLISECONDS); } else { HttpGet request = new HttpGet(task.url); request.setHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified())); request.setHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, LOCAL_IP); if (task.isBeta) { request.setHeader("isBeta", "true"); } httpclient.execute(request, new AyscNotifyCallBack(httpclient, task)); } } } } private Queue<NotifySingleTask> queue; private CloseableHttpAsyncClient httpclient; }
上面这段代码明天再慢慢解析吧,加油!