[大白装逼]YEventBus事件总线的实现
2018-03-25 11:57
411 查看
Y事件总线:基于java的Observe和Observable实现的事件总线
github地址:https://github.com/lewis-v/YEventBus
Add the dependency
发布事件
取消订阅
然后是java的Observer,这里实现了OnGetEvent接口,主要是要在本来的Observer接口上加上成功与失败的调用方法,其中Observer接口需要实现updata方法,此方法是在事件分发时调用的方法
YObserver控制了事件的实际处理及异常的获取
这里除了提供YObserver,还提供了YMainThreadObserver,此Observer的事件处理会在主线程中进行,添加此类的意义是,可以再发布时指定在主线程,也可以在订阅的时候指定在主线程,当然在订阅的时候指定的优先级比发布的时候指定优先级高.
事件的分发处理,会在一个线程池里进行,线程池的大小为Cpu核心数的2倍,当事件过多时会在线程池的队列中等待,需要注意的是对事件的处理尽量不要做太耗时的任务,不然把线程池中的所有线程都阻塞了会导致整个事件总线阻塞,后面的时间将无法继续发布.
github地址:https://github.com/lewis-v/YEventBus
使用方式
导入依赖
Add it in your root build.gradle at the end of repositories:allprojects { repositories { ... maven { url 'https://jitpack.io' } } }
Add the dependency
dependencies { compile 'com.github.lewis-v:YEventBus:1.0.0' }
使用方式
定义事件类TestEvent2继承于IEvent,并注册事件YEventBus.getInstance().subscriber(TestEvent2.class, new YObserver<TestEvent2>() {//订阅事件,处理的所在的线程与分发的线程一致 @Override public void onSuccess(TestEvent2 event) { Log.i(TAG,event.toString()); } @Override public void onFail(Exception e) { Log.e(TAG,e.getMessage()); } }); YEventBus.getInstance().subscriber(TestEvent.class, new YMainThreadObserver<TestEvent>() {//订阅事件,会在主线程中处理 @Override public void onSuccess(TestEvent event) { Log.i(TAG,event.toString()); } @Override public void onFail(Exception e) { Log.e(TAG,e.getMessage()); } });
发布事件
YEventBus.getInstance().postMainEvent(TestEvent.class,new TestEvent(TAG));//发布在主线程分发的事件 YEventBus.getInstance().postEvent(TestEvent.class,new TestEvent(TAG));//发布在子线程分发的事件
取消订阅
YEventBus.getInstance().unSubscriber(TestEvent.class,observer);//取消某事件下的某个订阅者的订阅 YEventBus.getInstance().unSubscriberEvent(TestEvent.class);//取消TestEvent整个系列事件的订阅 YEventBus.getInstance().unSubscriberAll();//取消所有事件的订阅
具体实现
Observable与Observer
首先是使用java的Observable,在发布事件时需要先setChanged()在进行发布,否者是发布不了的public class YObservable extends Observable { public <T extends IEvent> void postEvent(T data){ setChanged(); notifyObservers(data); } }
然后是java的Observer,这里实现了OnGetEvent接口,主要是要在本来的Observer接口上加上成功与失败的调用方法,其中Observer接口需要实现updata方法,此方法是在事件分发时调用的方法
interface OnGetEvent<E extends IEvent> extends Observer{ void onSuccess(E event); void onFail(Exception e); }
YObserver控制了事件的实际处理及异常的获取
public abstract class YObserver<E extends IEvent> implements OnGetEvent<E> { @Override public void update(Observable o, Object arg) { try { onSuccess((E) arg); }catch (Exception e){ onFail(e); } } }
这里除了提供YObserver,还提供了YMainThreadObserver,此Observer的事件处理会在主线程中进行,添加此类的意义是,可以再发布时指定在主线程,也可以在订阅的时候指定在主线程,当然在订阅的时候指定的优先级比发布的时候指定优先级高.
public abstract class YMainThreadObserver<E extends IEvent> implements OnGetEvent<E>{ @Override public void update(final Observable o, final Object arg) { ThreadSchedule.getMainHandle().post(new Runnable() { @Override public void run() { try { onSuccess((E) arg); }catch (Exception e){ onFail(e); } } }); } }
Observable管理类
YObservableManager用于管理Observable,内部定义了ConcurrentHashMap来存储Observable,其键值为对应事件的Class,在订阅和取消订阅会对map进行插入或遍历public class YObservableManager { private ConcurrentHashMap<Class,YObservable> mObservableMap; private IEventHandle handle; public YObservableManager() { mObservableMap = new ConcurrentHashMap<>(); init(); } public YObservableManager(ConcurrentHashMap<Class, YObservable> mObservableMap) { this.mObservableMap = mObservableMap; init(); } public void init(){ handle = new YEventHandle(); } /** * 设置自定义的事件分发处理 * @param handle */ public void setHandle(IEventHandle handle) { this.handle = handle; } /** * 发布消息 * @param event * @param <T> */ public <T extends IEvent> void postEvent(Class<T> event,T data){ YObservable observables = mObservableMap.get(event); if (handle == null){ init(); } handle.postEvent(observables,data); } /** * 发布主线程消息 * @param event * @param <T> */ public <T extends IEvent> void postMainEvent(Class<T> event,T data){ YObservable observables = mObservableMap.get(event); if (handle == null){ init(); } handle.postMainEvent(observables,data); } /** * 订阅事件 * @param event * @param observer * @param <T> */ public <T extends IEvent> void subscriber(Class<T> event,OnGetEvent<T> observer){ if (mObservableMap.containsKey(event)){ mObservableMap.get(event).addObserver(observer); }else { YObservable observable = new YObservable(); observable.addObserver(observer); mObservableMap.put(event, observable); } } /** * 解除订阅 * @param event * @param observer * @param <T> */ public <T extends IEvent> void unSubscriber(Class<T> event,YObserver<T> observer){ if (mObservableMap.containsKey(event)){ mObservableMap.get(event).deleteObserver(observer); } } /** * 解除一个事件系列的订阅 * @param event */ public void unSubscriberEvent(Class<? extends IEvent> event){ if (mObservableMap.containsKey(event)) { ba72 mObservableMap.get(event).deleteObservers(); mObservableMap.remove(event); } } /** * 解除所有事件订阅 */ public void unSubscriberAll(){ for (Map.Entry<Class,YObservable> entry : mObservableMap.entrySet()){ YObservable value = entry.getValue(); if (value != null){ value.deleteObservers(); } } mObservableMap.clear(); } /** * 释放资源 */ public void destroy(){ handle.destroy(); handle = null; unSubscriberAll(); } }
事件的发布
上述代码中,在发布消息的时候会调用IEventHandle的postEvent,其实际的实现为public class YEventHandle implements IEventHandle{ private ExecutorService executorServiceHandle;//处理线程池 public YEventHandle() { init(); } private void init(){ executorServiceHandle = Executors.newFixedThreadPool(2*Runtime.getRuntime().availableProcessors()); } /** * 发布消息 * @param observable * @param data * @param <T> * @throws InterruptedException */ @Override public <T extends IEvent> void postEvent(YObservable observable, T data) { handle(observable,data); } /** * 发布主线程处理消息 * @param observable * @param data * @param <T> */ @Override public <T extends IEvent> void postMainEvent(YObservable observable, T data) { handleInMain(observable,data); } /** * 处理 * @param observable * @param data * @param <T> */ private <T extends IEvent> void handle(final YObservable observable, final T data){ executorServiceHandle.execute(new Runnable() { @Override public void run() { if (observable != null) { observable.postEvent(data); } } }); } /** * 在主线程处理 * @param observable * @param data * @param <T> */ private <T extends IEvent> void handleInMain(final YObservable observable, final T data){ executorServiceHandle.execute(new Runnable() { @Override public void run() { if (observable != null) { ThreadSchedule.getMainHandle().post(new Runnable() { @Override public void run() { observable.postEvent(data); } }); } } }); } /** * 释放资源 */ @Override public void destroy() { executorServiceHandle.shutdownNow(); executorServiceHandle = null; } }
事件的分发处理,会在一个线程池里进行,线程池的大小为Cpu核心数的2倍,当事件过多时会在线程池的队列中等待,需要注意的是对事件的处理尽量不要做太耗时的任务,不然把线程池中的所有线程都阻塞了会导致整个事件总线阻塞,后面的时间将无法继续发布.
小结结
Y事件总线的实现只要是使用了java的Observable和Observer,其内部也是使用一个Vector类保存Observer,在发布的时候,遍历这里列表进行发布,这也是设计模式中的观察与被观察者的模式.相关文章推荐
- Android事件总线框架之EventBus(3.0为例)
- 事件总线分发库EventBus框架的简单使用
- ABP的事件总线和领域事件(EventBus & Domain Events)
- Orchard EventBus 事件总线及 IEventHandler作用
- 事件总线之EventBus
- Android事件总线之EventBus3.0基本使用
- EventBus StickyEvent(粘性事件) 的应用场景! 不需要手动注销事件总线
- Guava异步事件总线AsyncEventBus的注解AllowConcurrentEvents分析
- android 事件总线 -- Otto(五) EventHandler、EventProducer、AnnotatedHandlerFinder
- 单例模式+观察者模式,60行代码简单实现事件总线。
- js玩具——UI组件:PropertyChangeEventListener 属性改变事件监听器及默认实现
- android 事件总线 -- Otto(四) HandlerFinder、DeadEvent
- android Service+EventBus实现异地登录提示
- spark学习-48-Spark的event事件监听器LiveListenerBus和特质SparkListenerBus以及特质ListenerBus
- android实现N滴雨(采用onTouchEvent触摸事件和线程)
- 点击事件处理, 以及hitTest:withEvent:实现
- python多线程threading事件对象event实现线程阻塞及timer时间对象
- 使用事件等待句柄EventWaitHandler 实现生产者、消费者队列
- Android 框架学习2:源码分析 EventBus 3.0 如何实现事件总线
- 利用RxJava实现的事件总线(Event Bus)