您的位置:首页 > 编程语言 > Java开发

RxJava学习1---创建

2017-12-20 14:41 113 查看
测试 版本:

compile 'io.reactivex.rxjava2:rxjava:2.1.7'


1.通过create创建:

Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
Logger.log("subscribe","");
e.onNext("testTag");
e.onError(new NullPointerException());
e.onNext("testTag1");
e.onComplete();
}
});

Observer observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Logger.log("onSubscribe","");
}

@Override
public void onNext(@NonNull String o) {
Logger.log("onNext",""+o);
}

@Override
public void onError(@NonNull Throwable e) {
Logger.log("onError","");
}

@Override
public void onComplete() {
Logger.log("onComplete","");
}
};

observable.subscribe(observer);


打印结果:

onSubscribe:
subscribe:
onNext:testTag
onError: //onError回调后就停止了


2.通过just创建多个

Observer observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Logger.log("onSubscribe","");
}

@Override
public void onNext(@NonNull String o) {
Logger.log("onNext",""+o);
}

@Override
public void onError(@NonNull Throwable e) {
Logger.log("onError","");
}

@Override
public void onComplete() {
Logger.log("onComplete","");
}
};

Observable observable = Observable.just("testTag1","testTag2","testTag3","testTag4");
observable.subscribe(observer);//这里的传入类型可以是任意,但需要保证observer接受到的类型没错,不一致的时候需要进行类型转换


打印结果:

onSubscribe:
onNext:testTag1
onNext:testTag2
onNext:testTag3
onNext:testTag4
onComplete:


3.通过fromArray创建多个

Observable observable = Observable.fromArray("testTag1","testTag2","testTag3","testTag4");
observable.subscribe(observer);


或者:

Observable observable = Observable.fromArray(new String[]{"testTag1","testTag2","testTag3","testTag4"});//这里的传入类型可以是任意,但需要保证observer接受到的类型没错,不一致的时候需要进行类型转换
observable.subscribe(observer);


4.通过generate创建

Observable observable = Observable.generate(new Consumer<Emitter<String>>() {
@Override
public void accept(Emitter<String> emitter) throws Exception {      //通过重载调用emitter发送
emitter.onNext("testTag");
emitter.onComplete();//注意一定要调用这个方法,不然会一直回调重载
}
});
observable.subscribe(observer);


如果有初始化数据的话,可以这样做:

Observable observable = Observable.generate(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 2;//初始化数据,是Integer的2
}
}, new BiConsumer<Integer, Emitter<String>>() {
@Override
public void accept(Integer integer, Emitter<String> emitter) throws Exception {
//emitter发射出去的是String类型
emitter.onNext(integer.toString());
emitter.onComplete();

}
});
observable.subscribe(observer);
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: