您的位置:首页 > 编程语言 > Java开发

带你入门学习Rxjava--上手教程

2017-06-08 18:28 531 查看
相信各位看官对RxJava早有耳闻,那么关于什么是RxJava我就不再赘述了,不知道的可自行百度。网上的RxJava的入门门槛稍高,但入门不难,可以跟本文一起来学习
注: 本文针对rxjava 1.x.x ,用例为下,建议初学先从1.x看起
    compile 'io.reactivex:rxjava:1.1.6'

    compile 'io.reactivex:rxandroid:1.2.1'


官方的介绍

1.支持Java6+

2.android 2.3+

3.异步的

4.基于观察者设计模式(Observer、Observable)

5.Subscribe (订阅)

之前不了解设计模式--观察者模式的先来一个简单例子,懂得可以跳过


观察者模式的简单介绍:

观察者模式很好理解,类似于邮件订阅和RSS订阅,当我们浏览一些博客或wiki时,经常会看到RSS图标,就这的意思是,当你订阅了该文章,如果后续有更新,会及时通知你。

其实,简单来讲就一句话:当一个对象变化时,其它依赖该对象的对象都会收到通知,并且随着变化.对象之间是一种一对多的关系。先来看看关系图:



MySubject类就是我们的主对象,Observer1和Observer2是依赖于MySubject的对象,当MySubject变化时,Observer1和Observer2必然变化。AbstractSubject类中定义着需要监控的对象列表,可以对其进行修改:增加或删除被监控对象,且当MySubject变化时,负责通知在列表内存在的对象。我们看实现代码:

Observer两个实现类:



public interface Observer {   

   public void update();   



public class Observer1 implements Observer {   

   

   @Override   

   public void update() {   

      System.out.println("observer1 has received!");   

   }   

}


public class Observer2 implements Observer {   

   

   @Override   

   public void update() {   

      System.out.println("observer2 has received!");   

   }   

   



Subject接口及实现类
public interface Subject {   

      

   /*增加观察者*/   

   public void add(Observer observer);   

      

   /*删除观察者*/   

   public void del(Observer observer);   

      

   /*通知所有的观察者*/   

   public void notifyObservers();   

      

   /*自身的操作*/   

   public void operation();   

}   


public abstract class AbstractSubject implements Subject {   

   

   private Vector<Observer> vector = new Vector<Observer>();   

   @Override   

   public void add(Observer observer) {   

      vector.add(observer);   

   }   

   

   @Override   

   public void del(Observer observer) {   

      vector.remove(observer);   

   }   

   

   @Override   

   public void notifyObservers() {   

      Enumeration<Observer> enumo = vector.elements();   

      while(enumo.hasMoreElements()){   

         enumo.nextElement().update();   

      }   

   }   

}   


public class MySubject extends AbstractSubject {   

   

   @Override   

   public void operation() {   

      System.out.println("update self!");   

      notifyObservers();   

   }   

   

}   


public class ObserverTest {   

   

   public static void main(String[] args) {   

      Subject sub = new MySubject();   

      sub.add(new Observer1());   

      sub.add(new Observer2());   

         

      sub.operation();   

   }   

   

}   


输出:
update self!

observer1 has received!

observer2 has received!


好啦 ,正式进入Rxjava

先创建个数据发射源,很好理解,就是发射数据用的:被观察者
   Observable<String> sender = Observable.create(new Observable.OnSubscribe<String>() {

        @Override
        public void call(Subscriber<? super String> subscriber) {

           subscriber.onNext("Hi!");  //发送数据"Hi!"
        }
     });


再创建个数据接收源,同理,接收数据用的:观察者
   Observer<String> receiver = new Observer<String>() {

        @Override
        public void onCompleted() {

           //数据接收完成时调用
        }

        @Override
        public void onError(Throwable e) {

           //发生错误调用
        }

        @Override
        public void onNext(String s) {

           //正常接收数据调用
           System.out.print(s);  //将接收到来自sender的问候"Hi!"
        }
     };


好了,将发射源和接收源关联起来:
 sender.subscribe(receiver);


这样就形成RxJava一个简单的用法,sender发射
"Hi!"
,将会被receiver的
onNext
的接收,通过这个例子,也许你会想到“异步”、“观察者模式”,没错,这些都是RxJava所做的事情,并且让他们变得更简单和简洁,而RxJava所有的一切都将围绕这两个点展开,一个是发射数据,一个是接收数据,如果你理解了这点或者你已经知道RxJava就是这么一回事,那么恭喜你,你已经一只脚跨进RxJava的大门了,如果不是也无所谓,请继续往下看...

网上关于RxJava的博文也有很多,但绝大部分文章都有一个共同点,就是侧重于讲RxJava中各种强大的操作符,而忽略了最基本的东西——概念,下面对发射源和接收源做个归类.


基本概念

Observable:发射源,英文释义“可观察的”,在观察者模式中称为“被观察者”或“可观察对象”;

Observer:接收源,英文释义“观察者”,没错!就是观察者模式中的“观察者”,可接收Observable、Subject发射的数据;

Subject:Subject是一个比较特殊的对象,既可充当发射源,也可充当接收源,为避免初学者被混淆,本章将不对Subject做过多的解释和使用,重点放在Observable和Observer上,先把最基本方法的使用学会,后面再学其他的都不是什么问题;

Subscriber:“订阅者”,也是接收源,那它跟Observer有什么区别呢?Subscriber实现了Observer接口,比Observer多了一个最重要的方法
unsubscribe(
)
,用来取消订阅,当你不再想接收数据了,可以调用
unsubscribe(
)
方法停止接收,Observer 在 
subscribe()
 过程中,最终也会被转换成
Subscriber 对象,一般情况下,建议使用Subscriber作为接收源;

Subscription :Observable调用
subscribe(
)
方法返回的对象,同样有
unsubscribe( )
方法,可以用来取消订阅事件;

Action0:RxJava中的一个接口,它只有一个无参call()方法,且无返回值,同样还有Action1,Action2...Action9等,Action1封装了含有 1 个参的call()方法,即call(T t),Action2封装了含有 2 个参数的call方法,即call(T1 t1,T2 t2),以此类推;

Func0:与Action0非常相似,也有call()方法,但是它是有返回值的,同样也有Func0、Func1...Func9;


基本用法

Observable的创建

1.使用create( ),最基本的创建方式:
normalObservable = Observable.create(new Observable.OnSubscribe<String>() {
 @Override
 public void call(Subscriber<? super String> subscriber) {
    subscriber.onNext("create1"); //发射一个"create1"的String
    subscriber.onNext("create2"); //发射一个"create2"的String
    subscriber.onCompleted();//发射完成,这种方法需要手动调用onCompleted,才会回调Observer的onCompleted方法
 }});


2.使用just( ),将为你创建一个Observable并自动为你调用
onNext(
)
发射数据:
justObservable = Observable.just("just1","just2");//依次发送"just1"和"just2"


3.使用from( ),遍历集合,发送每个item:
List<String> list = new ArrayList<>();
list.add("from1");
list.add("from2");
list.add("from3");
fromObservable = Observable.from(list);  //遍历list 每次发送一个
/** 注意,just()方法也可以传list,但是发送的是整个list对象,而from()发送的是list的一个item** /


4.使用defer( ),有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable:
deferObservable = Observable.defer(new Func0<Observable<String>>() {
 @Override
 //注意此处的call方法没有Subscriber参数
 public Observable<String> call() {
    return Observable.just("deferObservable");
 }});


5.使用interval( ),创建一个按固定时间间隔发射整数序列的Observable,可用作定时器:
intervalObservable = Observable.interval(1, TimeUnit.SECONDS);//每隔一秒发送一次


6.使用range( ),创建一个发射特定整数序列的Observable,第一个参数为起始值,第二个为发送的个数,如果为0则不发送,负数则抛异常:
rangeObservable = Observable.range(10, 5);//将发送整数10,11,12,13,14


7.使用timer( ),创建一个Observable,它在一个给定的延迟后发射一个特殊的值,等同于Android中Handler的postDelay( )方法:
timeObservable = Observable.timer(3, TimeUnit.SECONDS);  //3秒后发射一个值


8.使用repeat( ),创建一个重复发射特定数据的Observable:
repeatObservable = Observable.just("repeatObservable").repeat(3);//重复发射3次


Observer的创建
mObserver = new Observer<String>() {
 @Override
 public void onCompleted() {
    LogUtil.log("onCompleted");
 }
 @Override
 public void onError(Throwable e) {
 }
 @Override
 public void onNext(String s) {
    LogUtil.log(s);
 }};


ok,有了Observable和Obsever,我们就可以随便玩了,任取一个已创建的Observable和Observer关联上,即形成一个RxJava的例子,如:
justObservable.subscribe(mObserver);


mObserver的
onNext
方法将会依次收到来自justObservable的数据
"just1"
"just2"
,另外,如果你不在意数据是否接收完或者是否出现错误,即不需要Observer的
onCompleted()
onError()
方法,可使用
Action1
subscribe()
支持将
Action1
作为参数传入,RxJava将会调用它的
call
方法来接收数据,代码如下:
justObservable.subscribe(new Action1<String>() {
   @Override
   public void call(String s) {

       LogUtil.log(s);
    }});


以上就是RxJava最简单的用法。

RxJava也以代码的简洁深受广大用户喜爱,简洁不能理解为代码量少,而是随着逻辑的复杂,需求的更改,代码可依然能保持极强的阅读性


RxJava的简单操作符


Scheduler

在讲常用操作符前,先看看Scheduler这个东西,名之为调度器,正因为有这个东西,让RxJava可以从主线程和子线程之间轻松切换,各个Scheduler的具体使用效果看以下表解释:


调度器类型用途
Schedulers.computation( )用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量
Schedulers.from(executor)使用指定的Executor作为调度器
Schedulers.immediate( )在当前线程立即开始执行任务
Schedulers.io( )用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器
Schedulers.newThread( )为每个任务创建一个新线程
Schedulers.trampoline( )当其它排队的任务完成后,在当前线程排队开始执行
AndroidSchedulers.mainThread()此调度器为RxAndroid特有,顾名思义,运行在Android UI线程上
具体如何使用呢,比如从数据库读取数据更新到UI上,假设数据量很大,直接从主线程读取数据,会造成UI卡顿,以前我们常用AnsyTask或者Handler去处理避免出现这类问题,个人认为手写个AnsyTask还是挺麻烦的,但用RxJava就简单多了,例如:
Observable.create(new Observable.OnSubscribe<Data>() {
   @Override
   public void call(Subscriber<? super Data> subscriber) {
     Data data = getData();//从数据库获取
     subscriber.onNext(data);
     subscriber.onCompleted();
   }})
   .subscribeOn(Schedulers.io())
   .observeOn(AndroidSchedulers.mainThread())
   .subscribe(new Action1<Data>() {
        @Override
        public void call(Data data) {

              //更新ui
        }
     });


简单粗暴的解释一下,
subscribeOn( )
决定了发射数据在哪个调度器上执行,
observeOn(AndroidSchedulers.mainThread())
则指定数据接收发生在UI线程,简直不要太方便。


常用操作符

Map:最常用且最实用的操作符之一,将对象转换成另一个对象发射出去,应用范围非常广,如数据的转换,数据的预处理等。

例一:数据类型转换,改变最终的接收的数据类型。假设传入本地图片路径,根据路径获取图片的Bitmap。
Observable.just(filePath).map(new Func1<String, Bitmap>() {
 @Override
 public Bitmap call(String path) {

     return getBitmapByPath(path);
 }}).subscribe(new Action1<Bitmap>() {
  @Override
 public void call(Bitmap bitmap) {

       //获取到bitmap,显示
}});


例二:对数据进行预处理,最后得到理想型数据。实际开发过程中,从后台接口获取到的数据也许不符合我们想要的,这时候可以在获取过程中对得到的数据进行预处理(结合Retrofit)。
Observable.just("12345678").map(new Func1<String, String>() {
 @Override
 public String call(String s) {
    return s.substring(0,4);//只要前四位
 }})
.subscribe(new Action1<String>() {
 @Override
 public void call(String s) {
    Log.i("mytag",s);
 }});


先说明一下,为了方便理解,所以写的例子都比较简单,不要以为明明可以简单用if-else解决的事,没必要用这种方式去写,当你真正将这些操作符使用到数据处理中去的时候,你就会发现有多方便。

FlatMap:和Map很像但又有所区别,Map只是转换发射的数据类型,而FlatMap可以将原始Observable转换成另一个Observable。还是举例说明吧。假设要打印全国所有学校的名称,可以直接用Map:

为了更清晰一点,先贴一下School类:
public class School {

 private String name;
 private List<Student> studentList;

 public List<Student> getStudentList() {
    return studentList;
 }
 public void setStudentList(List<Student> studentList) {
    this.studentList = studentList;
 }
 public String getName() {
    return name;
 }
 public void setName(String name) {
    this.name = name;
 }
 public static class Student{
    private String name;
    public String getName() {
       return name;
    }
    public void setName(String name) {
       this.name = name;
    }
 }
}


接着用Map打印学校名称:
List<School> schoolList = new ArrayList<>();
Observable.from(schoolList).map(new Func1<School, String>() {
 @Override
 public String call(School school) {
     return school.getName();
 }}).subscribe(new Action1<String>() {
 @Override
 public void call(String schoolName) {
     Log.i("mytag",schoolName);
 }});


再进一步,打印学校所有学生的姓名,先考虑用Map实现,将所有School对象直接转成Student:
Observable.from(schoolList).map(new Func1<School, School.Student>() {
 @Override
 public School.Student call(School school) {
    return school.getStudentList();
 }}).subscribe(new Action1<School.Student>() {
 @Override
 public void call(School.Student student) {

       Log.i("mytag",student.getName());
 }});


看似可行,但事实上,这是一段错误的代码,细心的人就会发现错误的地方
@Override
public School.Student call(School school) {
 return school.getStudentList();  //错误,Student 是一个对象,返回的却是一个list
}


所以用Map是无法实现直接打印学校的所有学生名字的,因为Map是一对一的关系,无法将单一的School对象转变成多个Student。前面说到,FlatMap可以改变原始Observable变成另外一个Observable,如果我们能利用
from()
操作符把
school.getStudentList()
变成另外一个Observable问题不就迎刃而解了吗,这时候就该FlatMap上场了,来看看它是怎么实现的:
Observable.from(schoolList).flatMap(new Func1<School, Observable<School.Student>>() {
 @Override
 public Observable<School.Student> call(School school) {

    return Observable.from(school.getStudentList()); //关键,将学生列表以另外一个Observable发射出去

 }}).subscribe(new Action1<School.Student>() {

 @Override
 public void call(School.Student student) {
    Log.i("mytag",student.getName());
 }});


Map和FlatMap在我看来就像孪生兄弟一样,非常实用,实际开发中也我也经常使用,个人觉得要想上手RxJava,掌握这两个操作符必不可少。

Buffer:缓存,可以设置缓存大小,缓存满后,以list的方式将数据发送出去;例:
Observable.just(1,2,3).buffer(2).subscribe(new Action1<List<Integer>>() {
 @Override
 public void call(List<Integer> list) {
    Log.i("mytag","size:"+list.size());
 }});


运行打印结果如下:
11-02 20:49:58.370 23392-23392/? I/mytag: size:2
11-02 20:49:58.370 23392-23392/? I/mytag: size:1


在开发当中,个人经常将Buffer和Map一起使用,常发生在从后台取完数据,对一个List中的数据进行预处理后,再用Buffer缓存后一起发送,保证最后数据接收还是一个List,如下:
List<School> schoolList = new ArrayList<>();
Observable.from(schoolList).map(new Func1<School, School>() {
 @Override
 public School call(School school) {
    school.setName("NB大学");  //将所有学校改名
    return school;
 }}).buffer(schoolList.size())  //缓存起来,最后一起发送
.subscribe(new Action1<List<School>>() {
 @Override
 public void call(List<School> schools) {  
}});


Take:发射前n项数据,还是用上面的例子,假设不要改所有学校的名称了,就改前四个学校的名称:
Observable.from(schoolList).take(4).map(new Func1<School, School>() {
 @Override
 public School call(School school) {
    school.setName("NB大学");
    return school;
 }}).buffer(4).subscribe(new Action1<List<School>>() {
 @Override
 public void call(List<School> schools) {
 }});


Distinct:去掉重复的项,比较好理解:
Observable.just(1, 2, 1, 1, 2, 3)
    .distinct()
    .subscribe(new Action1<Integer>() {
       @Override
       public void call(Integer item) {
          System.out.println("Next: " + item);
       }
    });

输出
Next: 1
Next: 2
Next: 3


Filter:过滤,通过谓词判断的项才会被发射,例如,发射小于4的数据:
Observable.just(1, 2, 3, 4, 5)
    .filter(new Func1<Integer, Boolean>() {
       @Override
       public Boolean call(Integer item) {
          return( item < 4 );
       }
    }).subscribe(new Action1<Integer>() {
     @Override
     public void call(Integer item) {
          System.out.println("Next: " + item);
   }});

输出:
Next: 1
Next: 2
Next: 3



关于Subject

关于Subject,官方文档的解释是这样的:Subject可以看成是一个桥梁或者代理,在某些ReactiveX实现中(如RxJava),它同时充当了Observer和Observable的角色。因为它是一个Observer,它可以订阅一个或多个Observable;又因为它是一个Observable,它可以转发它收到(Observe)的数据,也可以发射新的数据。从官方解释中,我提取出三个要点:
它可以充当Observable;
它可以充当Observer;
它是Observable和Observer之间的桥梁;

接下来对这三个要点解释一下,但在解释之前,要先介绍一下Subject的种类, Subject是一个抽象类,不能通过new来实例化Subject,所以Subject有四个实现类,分别为AsyncSubject、BehaviorSubject、PublishSubject和ReplaySubject,每个实现类都有特定的“技能”,下面结合代码来介绍一下它们各自的“技能”。注意,所有的实现类都由
create()
方法实例化,无需new,所有的实现类调用
onCompleted()
onError()
,它的Observer将不再接收数据;


Subject的分类解析

AsyncSubject

Observer会接收AsyncSubject的
`onComplete()
之前的最后一个数据,如果因异常而终止,AsyncSubject将不会释放任何数据,但是会向Observer传递一个异常通知。示例代码如下:
    AsyncSubject<String> asyncSubject = AsyncSubject.create();
    asyncSubject.onNext("asyncSubject1");
    asyncSubject.onNext("asyncSubject2");
    asyncSubject.onNext("asyncSubject3");  
    asyncSubject.onCompleted();
    asyncSubject.subscribe(new Observer<String>() {
       @Override
       public void onCompleted() {

          LogUtil.log("asyncSubject onCompleted");  //输出 asyncSubject onCompleted
       }

       @Override
       public void onError(Throwable e) {

          LogUtil.log("asyncSubject onError");  //不输出(异常才会输出)
       }

       @Override
       public void onNext(String s) {

          LogUtil.log("asyncSubject:"+s);  //输出asyncSubject:asyncSubject3
       }
    });


以上代码,Observer只会接收asyncSubject的onCompleted()被调用前的最后一个数据,即“asyncSubject3”,如果不调用onCompleted(),Subscriber将不接收任何数据。

BehaviorSubject

Observer会接收到BehaviorSubject被订阅之前的最后一个数据,再接收其他发射过来的数据,如果BehaviorSubject被订阅之前没有发送任何数据,则会发送一个默认数据。(注意跟AsyncSubject的区别,AsyncSubject要手动调用onCompleted(),且它的Observer会接收到onCompleted()前发送的最后一个数据,之后不会再接收数据,而BehaviorSubject不需手动调用onCompleted(),它的Observer接收的是BehaviorSubject被订阅前发送的最后一个数据,两个的分界点不一样,且之后还会继续接收数据。)示例代码如下:
  BehaviorSubject<String> behaviorSubject = BehaviorSubject.create("default");
 behaviorSubject.onNext("behaviorSubject1");
 behaviorSubject.onNext("behaviorSubject2");
    behaviorSubject.subscribe(new Observer<String>() {
       @Override
       public void onCompleted() {

          LogUtil.log("behaviorSubject:complete");
       }

       @Override
       public void onError(Throwable e) {

          LogUtil.log("behaviorSubject:error");
       }

       @Override
       public void onNext(String s) {

          LogUtil.log("behaviorSubject:"+s);
       }
    });

    behaviorSubject.onNext("behaviorSubject3");
    behaviorSubject.onNext("behaviorSubject4");


以上代码,Observer会接收到behaviorSubject2、behaviorSubject3、behaviorSubject4,如果在
behaviorSubject.subscribe()
之前不发送behaviorSubject1、behaviorSubject2,则Observer会先接收到default,再接收behaviorSubject3、behaviorSubject4。

PublishSubject

PublishSubject比较容易理解,相对比其他Subject常用,它的Observer只会接收到PublishSubject被订阅之后发送的数据。示例代码如下:
  PublishSubject<String> publishSubject = PublishSubject.create();
 publishSubject.onNext("publishSubject1");
 publishSubject.onNext("publishSubject2");
 publishSubject.subscribe(new Observer<String>() {
       @Override
       public void onCompleted() {

       }

       @Override
       public void onError(Throwable e) {

       }

       @Override
       public void onNext(String s) {
          LogUtil.log("publishSubject observer1:"+s);
       }
    });
 publishSubject.onNext("publishSubject3");
 publishSubject.onNext("publishSubject4");


以上代码,Observer只会接收到"behaviorSubject3"、"behaviorSubject4"。

ReplaySubject

ReplaySubject会发射所有数据给观察者,无论它们是何时订阅的。也有其它版本的ReplaySubject,在重放缓存增长到一定大小的时候或过了一段时间后会丢弃旧的数据。示例代码如下:
ReplaySubject<String>replaySubject = ReplaySubject.create(); //创建默认初始缓存容量大小为16的ReplaySubject,当数据条目超过16会重新分配内存空间,使用这种方式,不论ReplaySubject何时被订阅,Observer都能接收到数据
//replaySubject = ReplaySubject.create(100);//创建指定初始缓存容量大小为100的ReplaySubject
//replaySubject = ReplaySubject.createWithSize(2);//只缓存订阅前最后发送的2条数据
//replaySubject=ReplaySubject.createWithTime(1,TimeUnit.SECONDS,Schedulers.computation());  //replaySubject被订阅前的前1秒内发送的数据才能被接收   
replaySubject.onNext("replaySubject:pre1");
replaySubject.onNext("replaySubject:pre2");
replaySubject.onNext("replaySubject:pre3");
replaySubject.subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
          LogUtil.log("replaySubject:" + s);
    }
 });
replaySubject.onNext("replaySubject:after1");
replaySubject.onNext("replaySubject:after2");

如果你把 Subject 当作一个 Subscriber 使用,不要从多个线程中调用它的onNext方法(包括其它的on系列方法),这可能导致同时(非顺序)调用,这会违反Observable协议,给Subject的结果增加了不确定性。要避免此类问题,官方提出了“串行化”,你可以将 Subject 转换为一个 SerializedSubject ,类似于这样:
SerializedSubject<String, Integer> ser = new SerializedSubject(publishSubject);



为什么说Subject既可充当Observable,又可充当Observer,是它们两个之间的桥梁呢。首先,从理论上讲,Subject继承了Observable,又实现了Observer接口,所以说它既是Observable又是Observer,完全合理。从实际应用上讲,Subject也能实现Observable和Observer相同的功能

创建Observable并发射数据:
  Observable.create(new Observable.OnSubscribe<String>() {
       @Override
       public void call(Subscriber<? super String> subscriber) {

          subscriber.onNext("I'm Observable");
          subscriber.onCompleted();
       }
    });


用Subject实现为:
PublishSubject<String> publishSubject = PublishSubject.create();
publishSubject.onNext("as Observable");
publishSubject.onCompleted();


创建Observer订阅Observable并接收数据:
mObservable.subscribe(new Observer<String>() {
    @Override
     public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(String s) {

          //接收数据
    }
});


用Subject实现为:
   publishSubject.subscribe(new Observer<String>() {
       @Override
       public void onCompleted() {

       }

       @Override
       public void onError(Throwable e) {

       }

       @Override
       public void onNext(String s) {

       }
    });


也许有人会问,不是说Subject也可以作为Observer,不能把Subject当作Observer传入subscribe()中吗?回答是:当然可以!就象这样:
PublishSubject<String> publishSubject = PublishSubject.create();
Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {

       subscriber.onNext("as Observer");
       subscriber.onCompleted();
    }
}).subscribe(publishSubject);


有没有发现问题?publishSubject没有重写
onNext()
方法啊,在哪接收的数据?这就是前面说的“桥梁”的问题了,尽管把Subject作为Observer传入
subscribe()
,但接收数据还是要通过Observer来接收,借用Subject来连接Observable和Observer,整体代码如下:
PublishSubject<String> publishSubject = PublishSubject.create();
  Observable.create(new Observable.OnSubscribe<String>() {
       @Override
       public void call(Subscriber<? super String> subscriber) {

          subscriber.onNext("as Bridge");
          subscriber.onCompleted();
       }
    }).subscribe(publishSubject);

    publishSubject.subscribe(new Observer<String>() {
       @Override
       public void onCompleted() {

       }

       @Override
       public void onError(Throwable e) {

       }

       @Override
       public void onNext(String s) {

          LogUtil.log("subject:"+s); //接收到 as Bridge
       }
    });




参考:

 深入浅出RxJava就这一篇就够了


RxJava + Retrofit让Android网络请求简单效率

RxJava 从入门到放弃再到不离不弃

Java8之Lambda表达式(Android用法)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: