基于Rxjava2的事件总线:Rxbus
2017-05-05 15:57
323 查看
以前的项目中使用的是EventBus来实现事件的通知和订阅,RxJava2发布之后就使用了新的方式:RxBus,减少添加的依赖库。如果有什么错误的地方,或者有更好的建议的欢迎大家在下边留言,互相学习。
在发送消息的activity中代码:
在接收消息的activity中代码:
像上边直接传基本数据类型在实际项目中不推荐这样使用。我们可以自定义消息类(或者直接传JavaBean),例如:
在发送消息的时候,自己定义消息:
在接收消息的时候,选择性接收消息:
这里说明一下unregisterAll()方法,这个方法一旦调用了以后,所有的消息都是收不到的,所以如果要调用的话,建议在退出程序的Activity里面调用。
效果图:
点击下载源码
没有背压处理(Backpressure)的 RxBus
import android.support.annotation.NonNull; import io.reactivex.Observable; import io.reactivex.subjects.PublishSubject; import io.reactivex.subjects.Subject; public class RxBus { private final Subject<Object> mBus; private RxBus() { mBus = PublishSubject.create().toSerialized(); } public static RxBus getInstance() { return Holder.BUS; } public void post(@NonNull Object obj) { mBus.onNext(obj); } public <T> Observable<T> register(Class<T> tClass) { return mBus.ofType(tClass); } public Observable<Object> register() { return mBus; } public boolean hasObservers() { return mBus.hasObservers(); } public void unregisterAll() { //会将所有由mBus生成的Observable都置completed状态,后续的所有消息都收不到了 mBus.onComplete(); } private static class Holder { private static final RxBus BUS = new RxBus(); } }
有背压(Backpressure)处理的RxBus:
import android.support.annotation.NonNull; import io.reactivex.Flowable; import io.reactivex.processors.FlowableProcessor; import io.reactivex.processors.PublishProcessor; public class RxBus { private final FlowableProcessor<Object> mBus; private RxBus() { mBus = PublishProcessor.create().toSerialized(); } private static class Holder { private static RxBus instance = new RxBus(); } public static RxBus getInstance() { return Holder.instance; } public void post(@NonNull Object obj) { mBus.onNext(obj); } public <T> Flowable<T> register(Class<T> clz) { return mBus.ofType(clz); } public Flowable<Object> register() { return mBus; } public void unregisterAll() { //会将所有由mBus生成的Flowable都置completed状态后续的所有消息都收不到了 mBus.onComplete(); } public boolean hasSubscribers() { return mBus.hasSubscribers(); } }
在发送消息的activity中代码:
RxBus.getInstance().post("111");
在接收消息的activity中代码:
RxBus.getInstance().register(String.class).subscribe(new Consumer<String>() { @Override public void accept(String integer) throws Exception { toast(integer); } });
像上边直接传基本数据类型在实际项目中不推荐这样使用。我们可以自定义消息类(或者直接传JavaBean),例如:
public class MsgEvent<T> { private T data; private String mMsg; private int type; private int request; public MsgEvent(T data) { this.data = data; } public MsgEvent(int request, int type, String msg) { this.type = type; this.mMsg = msg; this.request = request; } public String getMsg(){ return mMsg; } public int getType(){ return type; } public int getRequest(){ return request; } public T getData(){return data;} }
在发送消息的时候,自己定义消息:
RxBus.getInstance().post(new MsgEvent(11,45,"今天天气很好"));
在接收消息的时候,选择性接收消息:
RxBus.getInstance().register(MsgEvent.class).subscribe(new Consumer<MsgEvent>() { @Override public void accept(MsgEvent msg) throws Exception { if (msg.getRequest() == 11) { tv.setText(msg.getMsg()); } } });
这里说明一下unregisterAll()方法,这个方法一旦调用了以后,所有的消息都是收不到的,所以如果要调用的话,建议在退出程序的Activity里面调用。
public void unregisterAll() { //会将所有由mBus生成的Observable都置completed状态,后续的所有消息都收不到了 mBus.onComplete(); }
效果图:
点击下载源码
相关文章推荐
- Rxjava2使用-构建事件总线(RxBus)代替原生广播
- 基于RxJava2后的RxBus的快速使用
- 事件总线 —— otto的bus和eventbus对比分析
- 事件总线分发库EventBus框架的简单使用
- 使用Vue开发网站之路2-多组件通信1(利用bus总线进行事件触发)
- RxBus的使用(基于RxJava2.0)
- Android事件总线之EventBus3.0基本使用
- 基于WCF和MSMQ构建发布/订阅消息总线(Pub/Sub Message Bus)
- Rx实现事件总线(类似于EventBus)总结
- 事件总线之EventBus
- Orchard EventBus 事件总线及 IEventHandler作用
- 基于Retrofit+Okio+RxBus实现文件下载(带下载进度)
- 基于Retrofit+Okio+RxBus实现文件下载(带下载进度)
- Guava异步事件总线AsyncEventBus的注解AllowConcurrentEvents分析
- 基于NoHttp+RxBus实现文件下载(带下载进度)
- ABP的事件总线和领域事件(EventBus & Domain Events)
- Android事件总线框架之EventBus(3.0为例)
- 基于Retrofit+Okio+RxBus实现文件下载(带下载进度)
- RxBus进阶------基于RxJava2.x实现以注解的方式传递消息
- 使用kafka作为springcloud bus的消息总线,以及如何自定义总线事件