EventBus 源码分析

前言

EventBus 是一款针对 Android 优化的发布-订阅事件总线。它简化了应用程序内各组件间的通信,开销小,代码更简洁,以及将发送者和接受者解耦。

本文将从源码的角度分析 EventBus 的实现原理,分析版本为 EventBus-3.0.0。

EventBus 的使用

在分析 EventBus 的源码前,先简单回顾一下 EventBus的使用。它的使用分为以下几个步骤:

  1. 定义一个事件类(实体类)

    1
    2
    3
    public class MessageEvent {
    ...
    }
  2. 事件订阅者进行事件注册

    1
    EventBus.getDefault().register(subscriber);
  3. 事件订阅者处理事件

    1
    2
    3
    4
    @Subscribe(threadMode = ThreadMode.MAIN)
    public void onMessageEventCome(MessageEvent event) {
    // 处理事件
    }
  4. 事件发布者发送事件

    1
    EventBus.getDefault().post(event);
  5. 事件订阅者取消事件订阅

    1
    EventBus.getDefault().unregister(subscriber);

下面将针对这几个步骤进行源码分析

源码分析

获得 EventBus 实例

在进行订阅和发送事件的时候,都需要先通过 EventBus.getDefault() 获得 EventBus 实例。

现在看一下 EventBus 的 getDefault 方法:

1
2
3
4
5
6
7
8
9
10
11
12
static volatile EventBus defaultInstance;

public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}

可以看到,EventBus 是一个单例,采用了 DCL 方式实现。

再看下 EventBus 的构造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();

public EventBus() {
this(DEFAULT_BUILDER);
}

EventBus(EventBusBuilder builder) {
subscriptionsByEventType = new HashMap<>();
typesBySubscriber = new HashMap<>();
stickyEvents = new ConcurrentHashMap<>();
mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
backgroundPoster = new BackgroundPoster(this);
asyncPoster = new AsyncPoster(this);
indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
logSubscriberExceptions = builder.logSubscriberExceptions;
logNoSubscriberMessages = builder.logNoSubscriberMessages;
sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
throwSubscriberException = builder.throwSubscriberException;
eventInheritance = builder.eventInheritance;
executorService = builder.executorService;
}

可以看到,在初始化时采用了 Builder 模式

订阅者进行注册

事件订阅者要想接收到事件,必须先进行注册:

1
EventBus.getDefault().register(subscriber);

看下 register 方法的实现:

EventBus#register

1
2
3
4
5
6
7
8
9
10
11
12
  public void register(Object subscriber) {
// 获取到订阅者的 Class 对象
Class<?> subscriberClass = subscriber.getClass();
// 获得订阅者的所有订阅方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
// 订阅所有方法
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}

在该方法中,首先获取到订阅者的 Class 对象,然后根据 Class 对象获得订阅者的所有订阅方法,最后订阅这些方法。

先看一下 SubscriberMethodFinder 的 findSubscriberMethods 方法:

SubscriberMethodFinder#findSubscriberMethods

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
  List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
// 首先查看是否有缓存,如果有缓存就获取缓存并返回
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}

// ignoreGeneratedIndex 属性表示是否忽略注解器生成的 MyEventBusIndex
// 该值默认为 false,可以通过配置 EventBusBuilder 来设置该属性
if (ignoreGeneratedIndex) {
// 利用反射来读取订阅者的订阅方法信息
subscriberMethods = findUsingReflection(subscriberClass);
} else {
// 该方法利用注解器生成的 MyEventBusIndex 获得订阅者的订阅方法信息
subscriberMethods = findUsingInfo(subscriberClass);
}

if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
// 如果获取成功就添加进缓存
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}

该方法首先查看是否有缓存,如果有缓存就从缓存中获取并返回。没有缓存的话,根据情况,从反射或从注解器生成的 MyEventBusIndex 中获得订阅者的订阅方法信息。最后如果获取成功就添加进缓存并返回,否则抛出异常。

下面分别看下两种获取订阅方法信息的方式,首先看反射的方式,这种方式调用 findUsingReflection 方法:

SubscriberMethodFinder#findUsingReflection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
  private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
// 获得 FindState 对象(FindState 是 SubscriberMethodFinder 的内部类)
FindState findState = prepareFindState();
// FindState 绑定订阅类
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
// 使用反射方式获取单个类的订阅方法
findUsingReflectionInSingleClass(findState);
// 将 clazz 指向父类的 Class,继续获取
findState.moveToSuperclass();
}
// 返回订阅者及其父类的订阅方法,并释放资源
return getMethodsAndRelease(findState);
}

该方法创建了一个 FindState 对象,用于辅助获取订阅方法,重点看 findUsingReflectionInSingleClass 方法,该方法使用反射方式获取单个类的订阅方法。

SubscriberMethodFinder#findUsingReflectionInSingleClass

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
  private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// 先通过反射获得订阅者的所有方法
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
// 对方法进行筛选,找到订阅方法
for (Method method : methods) {
int modifiers = method.getModifiers();
// 忽略非 public 方法和 static、abstract等修饰的方法
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
// 订阅方法只能有一个参数
if (parameterTypes.length == 1) {
// 订阅方法有 Subscribe 注解
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
// 得到事件(实体类)的类型
Class<?> eventType = parameterTypes[0];
// 检查事件类型
// 尽管 EventBus 允许有多个事件类型相同的方法,但一般不这么做
if (findState.checkAdd(method, eventType)) {
// 添加订阅方法到 FindState 中
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}

该方法先通过反射获得订阅者的所有方法,再对这些方法进行筛选,找出订阅方法。

筛选规则为:

  1. 忽略非 public 方法和 static、abstract等修饰的方法
  2. 订阅方法只能有一个参数
  3. 订阅方法有 Subscribe 注解
  4. 子类继承并重写了父类的订阅方法,那么只会添加子类的方法,父类的方法会忽略

看完反射方式获取订阅方法,下面看一下另一种方式,这种方式从注解器生成的 MyEventBusIndex 中获取,调用的是 findUsingInfo 方法:

SubscriberMethodFinder#findUsingInfo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
// 通过在 EventBusBuilder 配置的 MyEventBusIndex 获取信息
findState.subscriberInfo = getSubscriberInfo(findState);
// 获取成功的话再对信息筛选
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
// 获取失败就利用反射方式获取
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}

该方法也是先获得 FindState 对象,然后通过在 EventBusBuilder 配置的 MyEventBusIndex 获取信息,如果获取失败就利用反射方式获取。

所以在默认情况下,如果我们没有专门配置 MyEventBusIndex,那么都是通过反射方式来获取订阅方法的。

获取订阅方法就分析到这里,下面回到 register:

1
2
3
4
5
6
7
8
// 获得订阅者的所有订阅方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
// 订阅所有方法
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}

在获取到订阅方法后,会遍历各方法,调用 subscribe 方法进行订阅。下面看一下 subscribe 方法:

EventBus#subscribe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
  private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
// 获得事件类型
Class<?> eventType = subscriberMethod.eventType;
// 根据订阅者和订阅方法创建一个 Subscription(订阅对象)
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);

// 获取到该事件类型的订阅对象集合
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
// 没有获取到就新建一个并加入到 Map 中
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
}
// 不能重复注册事件
else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}

// 根据优先级添加新的 Subscription
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}

// 获取到订阅者的事件类型集合,若没有则新建并添加
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
// 将当前事件类型添加进订阅者的事件类型集合中
subscribedEvents.add(eventType);

// 对 sticky 事件的处理
// 从 sticky 事件保存队列中取出该事件类型的事件发送给当前订阅者
if (subscriberMethod.sticky) {
if (eventInheritance) {
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}

subscribe 方法主要做了这几件事:

  1. 获得事件类型,并根据订阅者和订阅方法创建一个 Subscription 对象
  2. 根据优先级将 Subscription 添加到当前事件类型对应的 Subscription 集合。注意不能重复添加,否则会抛出异常
  3. 将当前事件类型添加到订阅者的事件类型集合
  4. 对 sticky 事件的处理:从 sticky 事件保存队列中取出该事件类型的事件并发送给当前订阅者

小结

订阅者进行注册的步骤如下:

  1. 根据订阅者的 Class 对象获得订阅者的所有订阅方法。获取的时候先从缓存中获取,没有的话就通过反射或从注解器生成的 MyEventBusIndex 中获取,默认情况下先从 MyEventBusIndex 中获取,获取不到再采取反射方式,先通过反射获取到所有方法,然后再根据修饰符、参数个数、注解筛选出订阅方法。
  2. 对每个订阅方法进行订阅,订阅步骤如下:
    1. 获得事件类型,并根据订阅者和订阅方法创建一个 Subscription 对象
    2. 根据优先级将 Subscription 添加到当前事件类型对应的 Subscription 集合
    3. 将当前事件类型添加到订阅者的事件类型集合
    4. 对 sticky 事件的处理:从 sticky 事件保存队列中取出对应事件并发送给订阅者

发布事件

事件发布者通过 post 或 postSticky 方法发布事件,例如:

1
EventBus.getDefault().post(event);

先看 post 方法:

EventBus#post

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
  public void post(Object event) {
// 获取当前线程的 Posting 状态
PostingThreadState postingState = currentPostingThreadState.get();
// 将当前事件插入到事件队列中
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);

// 判断当前线程是否正在发布事件
if (!postingState.isPosting) {
postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
// 处理队列中的事件
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}

EventBus 用 ThreadLocal 存储每个线程的 PostingThreadState。post 事件时,会将事件添加到当前线程的事件队列中,先等待前面的事件处理完毕,接着将事件交给 postSingleEvent 处理:

EventBus#postSingleEvent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
  private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;

// eventInheritance 表示是否向上查找事件的父类,默认为 true
if (eventInheritance) {
// 找到事件所有父类的类型
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
// 发布事件
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}

// 找不到该事件时,抛出异常
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
Log.d(TAG, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}

根据 eventInheritance 属性判断是否还要找出发布事件的所有父类。最后继续调用 postSingleEventForEventType 方法发布事件:

EventBus#postSingleEventForEventType

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
  private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
// 得到事件类型对应的 Subscription 对象集合
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
// 遍历 Subscription
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
// 发布事件到具体的订阅者
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}

先得到事件类型对应的 Subscription 对象集合,遍历该集合,发布事件到具体的订阅者。发布过程调用 postToSubscription 方法:

EventBus#postToSubscription

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
  private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
// 根据订阅方法的 threadMode 进行相应操作
switch (subscription.subscriberMethod.threadMode) {
// 事件在哪个线程发布,事件处理方法就会在哪个线程执行
case POSTING:
invokeSubscriber(subscription, event);
break;
// 事件的处理会在 UI 线程执行
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
// 如果事件是在 UI 线程发布,事件就会在一个新的线程里处理
// 如果事件在子线程发布,那么事件就在发布线程执行
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
// 无论在哪个线程发布,事件处理都会在一个新线程里执行
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}

根据订阅方法的 threadMode 在相应的线程进行事件处理。进行事件处理调用的是 invokeSubscriber 方法:

EventBus#invokeSubscriber

1
2
3
4
5
6
7
8
9
10
 void invokeSubscriber(Subscription subscription, Object event) {
try {
// 调用订阅者的订阅方法,事件作为参数传入
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}

最终,在 invokeSubscriber 方法中调用了订阅者的订阅方法并将事件作为参数传递过去。

小结

发布事件的步骤如下:

  1. post 事件后,先将事件添加到当前线程的事件队列中,等待前面的事件处理完毕。
  2. 根据 eventInheritance 属性判断是否还要将事件发送给带有事件父类的订阅方法。
  3. 得到事件类型,根据事件类型得到所有的订阅者和订阅方法信息。
  4. 根据订阅方法的 threadMode,在相应的线程里,利用反射的方式调用订阅者的订阅方法并将事件作为参数传递过去。

订阅者取消注册

取消注册调用的是 unregister 方法:

EventBus#unregister

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
  public synchronized void unregister(Object subscriber) {
// 获取到当前订阅者的所有事件类型
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
// 移除当前订阅者的所有订阅方法对应的 Subscription
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
} else {
Log.w(TAG, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}

private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
// 事件类型对应的所有 Subscription
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
// 销毁并移除对应订阅者和订阅方法的 Subscription
if (subscription.subscriber == subscriber) {
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}

总的来说,取消注册过程就是把注册过程保存起来的当前订阅者和订阅方法信息都移除。

设计模式

观察者模式

观察者模式:定义了对象间的一对多依赖关系,每当一个对象改变状态时,它的所有依赖者都会得到通知并自动更新。

对于 EventBus 来说,事件类是被观察者,订阅者类是观察者。当事件出现的时候,会通知观察者,使得观察者的订阅方法能够自动调用。

EventBus 又和一般的观察者模式不太一样,它并不需要事件类保存观察者名单和通知观察者,因为这样对于每一个事件类都要实现相同的操作,就太过繁琐和低效率了。因此,EventBus 就充当了被观察者和观察者的桥梁,把事件的很多责任都抽离出来,事件本身只需要存储数据,别的交给 EventBus 就行了。

参考

-------------    本文到此结束  感谢您的阅读    -------------
0%