## 前言
大部分框架都是事件订阅功能,即观察者模式,或者叫事件机制。通过订阅某个事件,当触发事件时,回调某个方法。该功能非常的好用,而 SOFA 内部也设计了这个功能,并且内部大量使用了该功能。来看看是如何设计的。
源码分析
核心类有 3 个:
- EventBus 事件总线
- Event 事件,即被观察者
- Subscriber 订阅者,即观察者
Subscriber 是个抽象类, 子类需要自己实现 onEvent 方法,即回调方法。还有一个是否同步执行的参数。
EventBus 类实现了注册功能,反注册功能(删除)。事件发生时通知订阅者功能。
内部使用一个“大型数据结构”保存事件和订阅者的信息。
1
| ConcurrentHashMap<Class<? extends Event>, CopyOnWriteArraySet<Subscriber>> SUBSCRIBER_MAP
|
所有相关信息都保存在该数据结构中。
看看注册功能。
1 2 3 4 5 6 7 8 9 10 11
| public static void register(Class<? extends Event> eventClass, Subscriber subscriber) { CopyOnWriteArraySet<Subscriber> set = SUBSCRIBER_MAP.get(eventClass); if (set == null) { set = new CopyOnWriteArraySet<Subscriber>(); CopyOnWriteArraySet<Subscriber> old = SUBSCRIBER_MAP.putIfAbsent(eventClass, set); if (old != null) { set = old; } } set.add(subscriber); }
|
参数为 一个事件对象,一个订阅对象。
首先从 Map 中根据事件的 Class 获取对应的订阅者集合,注意,这里都是用的并发容器。
下面的判断有点意思,考虑到并发的情况,如果第一次获取 Set 是 null,则尝试创建一个并放进 Map,这里使用的并不是 put 方法,而是 putIfAbsent 方法,该方法作用等同于:
1 2 3 4
| if (!map.containsKey(key)) return map.put(key, value); else return map.get(key);
|
所以,这里再一次考虑并发问题,如果这个间隙有其他线程 put 了,就可以获取到那个线程 put 的 Set。很谨慎。而且性能相比较锁要好很多。虽然这个方法并发量不会很高,但也是一种性能优化。
如果发生了并发,就使用已有的 Set,然后将 Set 放置到 Map 中,完成事件和订阅者的映射。
再看看取消注册方法。
1 2 3 4 5 6
| public static void unRegister(Class<? extends Event> eventClass, Subscriber subscriber) { CopyOnWriteArraySet<Subscriber> set = SUBSCRIBER_MAP.get(eventClass); if (set != null) { set.remove(subscriber); } }
|
很简单,就是直接删除。
再看看通知功能:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public static void post(final Event event) { if (!isEnable()) { return; } CopyOnWriteArraySet<Subscriber> subscribers = SUBSCRIBER_MAP.get(event.getClass()); if (CommonUtils.isNotEmpty(subscribers)) { for (final Subscriber subscriber : subscribers) { if (subscriber.isSync()) { handleEvent(subscriber, event); } else { AsyncRuntime.getAsyncThreadPool().execute( new Runnable() { @Override public void run() { handleEvent(subscriber, event); } }); } } } }
|
首先看是否开启了总线功能,在性能测试的时候,可能是关闭的。
如果开启了,就根据给定的时间找到订阅者,循环调用 handleEvent 方法(其实就是调用订阅者的 onEvent 方法)。
这里有一个是否异步的判断,如果异步的,则在异步线程池执行。
这个异步线程池 AsyncRuntime 可以看一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| public static ThreadPoolExecutor getAsyncThreadPool(boolean build) { if (asyncThreadPool == null && build) { synchronized (AsyncRuntime.class) { if (asyncThreadPool == null && build) { int coresize = RpcConfigs.getIntValue(RpcOptions.ASYNC_POOL_CORE); int maxsize = RpcConfigs.getIntValue(RpcOptions.ASYNC_POOL_MAX); int queuesize = RpcConfigs.getIntValue(RpcOptions.ASYNC_POOL_QUEUE); int keepAliveTime = RpcConfigs.getIntValue(RpcOptions.ASYNC_POOL_TIME);
BlockingQueue<Runnable> queue = ThreadPoolUtils.buildQueue(queuesize); NamedThreadFactory threadFactory = new NamedThreadFactory("SOFA-RPC-CB", true);
RejectedExecutionHandler handler = new RejectedExecutionHandler() { private int i = 1;
@Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (i++ % 7 == 0) { i = 1; if (LOGGER.isWarnEnabled()) { LOGGER.warn("Task:{} has been reject because of threadPool exhausted!" + " pool:{}, active:{}, queue:{}, taskcnt: {}", r, executor.getPoolSize(), executor.getActiveCount(), executor.getQueue().size(), executor.getTaskCount()); } } throw new RejectedExecutionException("Callback handler thread pool has bean exhausted"); } }; asyncThreadPool = ThreadPoolUtils.newCachedThreadPool( coresize, maxsize, keepAliveTime, queue, threadFactory, handler); } } } return asyncThreadPool; }
|
这里也做了双重检查锁。
默认核心线程大小 10,最大 200, 队列大小 256, 回收时间 60 秒。
因此,获取的队列就是 LinkedBlockingQueue。
这里的拒绝策略很有意思,每失败 6 次,打印详细信息,当前线程数,活动线程数量,队列 size, 任务总数,不知道为什么这么设计(6次??)。
目前框架中 Event 的实现很多,我们在之前的源码分析中也看到很多了。而订阅者目前只有一个 FaultToleranceSubscriber。用于容错处理。是 FaultToleranceModule 模块的功能。该功能也是个扩展点,当系统初始化的时候,会注册 ClientSyncReceiveEvent 事件和 ClientAsyncReceiveEvent。
总结
这个事件总线功能真是观察者模式的最佳实践,通过系统中发生的事件,能够让外部模块感知到并进行处理,比如上面介绍的容错模块。当发生订阅的事件后,外部模块能够响应,很完美。