zoukankan      html  css  js  c++  java
  • 65.源码解析:EventBus

    1.使用

    注册:
    public  class MyBaseFragment extends Fragment {

    @Override
    public void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    //注册EventBus
    EventBus.getDefault().register(this);
    }

    @Override
    public void onDestroy() {
    super.onDestroy();
    //反注册EventBus
    EventBus.getDefault().unregister(this);
    }

    public void onEvent(MyBaseEvent event){
    //接收消息-默认类型,必须定义,不然程序会报错!!!但是没卵用
    }
    public void onEventMainThread(MyBaseEvent event){
    //接收消息-主线程
    }

    public void onEventBackgroundThread(MyBaseEvent event){
    //接收消息-后台线程
    }
    发布:
    EventBus.getDefault().post(new MyBaseEvent(MyEventEnum.AddPkEnd));
    订阅:
    @Override
    public void onEventMainThread(MyBaseEvent ev) {
    super.onEventMainThread(ev);
    if (ev.getAction() == MyEventEnum.AddPkEnd) {
    tvPkNumer.setText("" + MyConfig.pkNumber);
    }


    2.源码解析

    (1)注册
    public static EventBus getDefault() {
    if(defaultInstance == null) {
    Class var0 = EventBus.class;
    synchronized(EventBus.class) {
    if(defaultInstance == null) {
    defaultInstance = new EventBus();
    }
    }
    }

    return defaultInstance;
    }

    public void register(Object subscriber) {
    register(subscriber, false, 0);
    }

    private synchronized void register(Object subscriber, boolean sticky, int priority) {
    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriber.getClass());
    for (SubscriberMethod subscriberMethod : subscriberMethods) {
    subscribe(subscriber, subscriberMethod, sticky, priority);
    }
    }
    (2)根据订阅类,找到订阅类中的所有订阅方法(以onEvent开头的方法……)
    List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
    String key = subscriberClass.getName();
    List<SubscriberMethod> subscriberMethods;
    synchronized (methodCache) {
    subscriberMethods = methodCache.get(key);
    }
    if (subscriberMethods != null) {
    return subscriberMethods;
    }
    subscriberMethods = new ArrayList<SubscriberMethod>();
    Class<?> clazz = subscriberClass;
    HashSet<String> eventTypesFound = new HashSet<String>();
    StringBuilder methodKeyBuilder = new StringBuilder();
    while (clazz != null) {
    String name = clazz.getName();
    if (name.startsWith("java.") || name.startsWith("javax.") || name.startsWith("android.")) {
    // Skip system classes, this just degrades performance
    break;
    }

    // Starting with EventBus 2.2 we enforced methods to be public (might change with annotations again)
    Method[] methods = clazz.getDeclaredMethods();
    for (Method method : methods) {
    String methodName = method.getName();
    if (methodName.startsWith(ON_EVENT_METHOD_NAME)) {
    int modifiers = method.getModifiers();
    if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
    Class<?>[] parameterTypes = method.getParameterTypes();
    if (parameterTypes.length == 1) {
    String modifierString = methodName.substring(ON_EVENT_METHOD_NAME.length());
    ThreadMode threadMode;
    if (modifierString.length() == 0) {
    threadMode = ThreadMode.PostThread;
    } else if (modifierString.equals("MainThread")) {
    threadMode = ThreadMode.MainThread;
    } else if (modifierString.equals("BackgroundThread")) {
    threadMode = ThreadMode.BackgroundThread;
    } else if (modifierString.equals("Async")) {
    threadMode = ThreadMode.Async;
    } else {
    if (skipMethodVerificationForClasses.containsKey(clazz)) {
    continue;
    } else {
    throw new EventBusException("Illegal onEvent method, check for typos: " + method);
    }
    }
    Class<?> eventType = parameterTypes[0];
    methodKeyBuilder.setLength(0);
    methodKeyBuilder.append(methodName);
    methodKeyBuilder.append('>').append(eventType.getName());
    String methodKey = methodKeyBuilder.toString();
    if (eventTypesFound.add(methodKey)) {
    // Only add if not already found in a sub class
    subscriberMethods.add(new SubscriberMethod(method, threadMode, eventType));
    }
    }
    } else if (!skipMethodVerificationForClasses.containsKey(clazz)) {
    Log.d(EventBus.TAG, "Skipping method (not public, static or abstract): " + clazz + "."
    + methodName);
    }
    }
    }
    clazz = clazz.getSuperclass();
    }
    if (subscriberMethods.isEmpty()) {
    throw new EventBusException("Subscriber " + subscriberClass + " has no public methods called "
    + ON_EVENT_METHOD_NAME);
    } else {
    synchronized (methodCache) {
    methodCache.put(key, subscriberMethods);
    }
    return subscriberMethods;
    }
    }
    (3)遍历订阅类中的订阅方法,存进一个全局的Map,key为eventType, value为订阅方法的包装类(持有订阅者、订阅方法(方法、线程模式、事件类型)、优先级)
    这里:Value类型为CopyOnWriteArrayList,为线程安全的类ArrayList(不是继承自ArrayList)
    private void subscribe(Object subscriber, SubscriberMethod subscriberMethod, boolean sticky, int priority) {
    Class<?> eventType = subscriberMethod.eventType;
    CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    Subscription newSubscription = new Subscription(subscriber, subscriberMethod, priority);
    if (subscriptions == null) {
    subscriptions = new CopyOnWriteArrayList<Subscription>();
    subscriptionsByEventType.put(eventType, subscriptions);
    } else {
    if (subscriptions.contains(newSubscription)) {
    throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
    + eventType);
    }
    }

    // Starting with EventBus 2.2 we enforced methods to be public (might change with annotations again)
    // subscriberMethod.method.setAccessible(true);

    int size = subscriptions.size();
    for (int i = 0; i <= size; i++) {
    if (i == size || newSubscription.priority > subscriptions.get(i).priority) {
    subscriptions.add(i, newSubscription);
    break;
    }
    }

    List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
    if (subscribedEvents == null) {
    subscribedEvents = new ArrayList<Class<?>>();
    typesBySubscriber.put(subscriber, subscribedEvents);
    }
    subscribedEvents.add(eventType);

    if (sticky) {
    Object stickyEvent;
    synchronized (stickyEvents) {
    stickyEvent = stickyEvents.get(eventType);
    }
    if (stickyEvent != null) {
    // If the subscriber is trying to abort the event, it will fail (event is not tracked in posting state)
    // --> Strange corner case, which we don't take care of here.
    postToSubscription(newSubscription, stickyEvent, Looper.getMainLooper() == Looper.myLooper());
    }
    }
    }

    (4)发布事件,使用ThreadLocal,为每个线程分别获取一个事件发布状态的副本,遍历事件队列,循环发布事件
    这里:isMainThread存储了当前是否是主线程,后面会用到
    public void post(Object event) {
    EventBus.PostingThreadState postingState = (EventBus.PostingThreadState)this.currentPostingThreadState.get();
    List eventQueue = postingState.eventQueue;
    eventQueue.add(event);
    if(!postingState.isPosting) {
    postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
    postingState.isPosting = true;
    if(postingState.canceled) {
    throw new EventBusException("Internal error. Abort state was not reset");
    }

    try {
    while(!eventQueue.isEmpty()) {
    this.postSingleEvent(eventQueue.remove(0), postingState);
    }
    } finally {
    postingState.isPosting = false;
    postingState.isMainThread = false;
    }
    }

    }
    5)遍历所有的事件类型
    private void postSingleEvent(Object event, EventBus.PostingThreadState postingState) throws Error {
    Class eventClass = event.getClass();
    boolean subscriptionFound = false;
    if(this.eventInheritance) {
    List eventTypes = this.lookupAllEventTypes(eventClass);
    int countTypes = eventTypes.size();

    for(int h = 0; h < countTypes; ++h) {
    Class clazz = (Class)eventTypes.get(h);
    subscriptionFound |= this.postSingleEventForEventType(event, postingState, clazz);
    }
    } else {
    subscriptionFound = this.postSingleEventForEventType(event, postingState, eventClass);
    }

    if(!subscriptionFound) {
    if(this.logNoSubscriberMessages) {
    Log.d(TAG, "No subscribers registered for event " + eventClass);
    }

    if(this.sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class && eventClass != SubscriberExceptionEvent.class) {
    this.post(new NoSubscriberEvent(this, event));
    }
    }

    }
    (6)根据事件类型,到全局的Map中去查找订阅方法的包装类集合,再遍历包装类集合,发布事件给订阅者
    private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
    CopyOnWriteArrayList<Subscription> subscriptions;
    synchronized (this) {
    subscriptions = subscriptionsByEventType.get(eventClass);
    }
    if (subscriptions != null && !subscriptions.isEmpty()) {
    for (Subscription subscription : subscriptions) {
    postingState.event = event;
    postingState.subscription = subscription;
    boolean aborted = false;
    try {
    postToSubscription(subscription, event, postingState.isMainThread);
    aborted = postingState.canceled;
    } finally {
    postingState.event = null;
    postingState.subscription = null;
    postingState.canceled = false;
    }
    if (aborted) {
    break;
    }
    }
    return true;
    }
    return false;
    }
    (7)根据订阅方法的线程模式,以及当前的线程类型,分发执行订阅方法
    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
    switch (subscription.subscriberMethod.threadMode) {
    case PostThread:
    invokeSubscriber(subscription, event);
    break;
    case MainThread:
    if (isMainThread) {
    invokeSubscriber(subscription, event);
    } else {
    mainThreadPoster.enqueue(subscription, event);
    }
    break;
    case BackgroundThread:
    if (isMainThread) {
    backgroundPoster.enqueue(subscription, event);
    } else {
    invokeSubscriber(subscription, event);
    }
    break;
    case Async:
    asyncPoster.enqueue(subscription, event);
    break;
    default:
    throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
    }
    }

    (8-1)使用反射,在当前线程执行订阅者的订阅方法
    void invokeSubscriber(Subscription subscription, Object event) {
    try {
    subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
    } catch (InvocationTargetException e) {
    handleSubscriberException(subscription, event, e.getCause());
    } catch (IllegalAccessException e) {
    throw new IllegalStateException("Unexpected exception", e);
    }
    }
    (8-2)切换到主线程执行订阅方法,其实就是用了Handler
    void enqueue(Subscription subscription, Object event) {
    PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
    synchronized (this) {
    queue.enqueue(pendingPost);
    if (!handlerActive) {
    handlerActive = true;
    if (!sendMessage(obtainMessage())) {
    throw new EventBusException("Could not send handler message");
    }
    }
    }
    }

    @Override
    public void handleMessage(Message msg) {
    boolean rescheduled = false;
    try {
    long started = SystemClock.uptimeMillis();
    while (true) {
    PendingPost pendingPost = queue.poll();
    if (pendingPost == null) {
    synchronized (this) {
    // Check again, this time in synchronized
    pendingPost = queue.poll();
    if (pendingPost == null) {
    handlerActive = false;
    return;
    }
    }
    }
    eventBus.invokeSubscriber(pendingPost);
    long timeInMethod = SystemClock.uptimeMillis() - started;
    if (timeInMethod >= maxMillisInsideHandleMessage) {
    if (!sendMessage(obtainMessage())) {
    throw new EventBusException("Could not send handler message");
    }
    rescheduled = true;
    return;
    }
    }
    } finally {
    handlerActive = rescheduled;
    }
    }

    (8-3)切换到后台线程执行订阅方法,其实就是用了Executors.newCachedThreadPool()逐个控制并发
    public void enqueue(Subscription subscription, Object event) {
    PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
    synchronized (this) {
    queue.enqueue(pendingPost);
    if (!executorRunning) {
    executorRunning = true;
    eventBus.getExecutorService().execute(this);
    }
    }
    }

    @Override
    public void run() {
    try {
    try {
    while (true) {
    PendingPost pendingPost = queue.poll(1000);
    if (pendingPost == null) {
    synchronized (this) {
    // Check again, this time in synchronized
    pendingPost = queue.poll();
    if (pendingPost == null) {
    executorRunning = false;
    return;
    }
    }
    }
    eventBus.invokeSubscriber(pendingPost);
    }
    } catch (InterruptedException e) {
    Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);
    }
    } finally {
    executorRunning = false;
    }
    }

    (8-4)切换到异步线程执行订阅方法,其实就是用了Executors.newCachedThreadPool()动态控制并发
    public void enqueue(Subscription subscription, Object event) {
    PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
    queue.enqueue(pendingPost);
    eventBus.getExecutorService().execute(this);
    }

    @Override
    public void run() {
    PendingPost pendingPost = queue.poll();
    if(pendingPost == null) {
    throw new IllegalStateException("No pending post available");
    }
    eventBus.invokeSubscriber(pendingPost);
    }

    void invokeSubscriber(PendingPost pendingPost) {
    Object event = pendingPost.event;
    Subscription subscription = pendingPost.subscription;
    PendingPost.releasePendingPost(pendingPost);
    if (subscription.active) {
    invokeSubscriber(subscription, event);
    }
    }
    类图:


    参考:



















  • 相关阅读:
    L1-050 倒数第N个字符串 (15分)
    Oracle存储过程的疑难问题
    Linux的细节
    Linux字符设备和块设备的区别
    Shell变量
    游标的常用属性
    Oracle中Execute Immediate用法
    Oracle中的sqlerrm和sqlcode
    Oracle把一个表的数据复制到另一个表中
    Oracle的差异增量和累积增量
  • 原文地址:https://www.cnblogs.com/yutianran/p/5069629.html
Copyright © 2011-2022 走看看