您的位置:首页 > 编程语言 > Java开发

RxJava2总结之自定义操作符与实用技巧

2017-08-18 11:09 232 查看

目录

目录

自定义操作符
lift 原理图

实用技巧
flatMap 与 zip 配合的实用范例

map的实用范例

方法泛型的实用范例

BehaviorSubject的使用技巧

Observable 发射元素的封装范例

参考文档 Thanks

自定义操作符

lift 原理图



@Test
public void lift(){
Observable.just(1,2)
//也是代理模式  observer是真正订阅
.lift(observer -> new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
observer.onNext(integer+"?");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
})
.subscribe(o -> System.out.println(o));
}
日志:
1?
2?


实用技巧

flatMap 与 zip 配合的实用范例

Observable.fromArray(new File("/Users/fuzhipeng/Documents"))
.flatMap(file -> Observable.fromArray(file.listFiles()))
//比较经典的 就是Observable.just(file) 把 file一个元素转成 observer从而进行zip合并的难题解决了
.flatMap(file ->
Observable.zip(Observable.just(file)
, Observable.timer(1, TimeUnit.SECONDS)
, (file1, aLong) -> file1))
.filter(file -> file.getName().endsWith(".png"))
.take(5)
.map(file -> file.getName())
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(s -> System.out.println(s));
while (true) {
}


map的实用范例

//有些服务几口设计,返回数据外层会包裹一些额外信息,可以使用map()吧外层格式剥掉
Observable.just(1)
.map(integer -> new Integer[]{1, 2, 3})
.subscribe(integers -> System.out.println(integers));


方法泛型的实用范例

Observable.just(1, "2", 3)
.cast(Integer.class)
.retryWhen(throwableObservable -> {
return throwableObservable.switchMap(throwable -> {
if (throwable instanceof IllegalArgumentException)
return Observable.just(throwable);
//todo  方法泛型 如果我不写<Object> 则会报错
return Observable.<Object>error(throwable);
//这个报错!!!
//                        return Observable.error(throwable);
});
})
.subscribe(o -> System.out.println("===>" + o + "\t")
, throwable -> System.out.println("===>throwable")
, () -> System.out.println("===>complete"));


BehaviorSubject的使用技巧

cache BehaviorSubject 是桥梁 并且有 发送最近的缓存特性!

BehaviorSubject<Object> cache = BehaviorSubject.create();
Observable.timer(1,TimeUnit.SECONDS)
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(cache);
//可以想象成上面是方法  这里是方法被调用
cache.subscribe(o -> System.out.println(o));//结果0


Observable 发射元素的封装范例

//创建一个Observable 可以直接发送的 原因 获取rx内部方法需要final很恶心 所以...
RxEmitter<Integer> emitter = new RxEmitter();
Observable.create(emitter)
.subscribe(integer -> System.out.println(integer));
emitter.onNext(1);
emitter.onNext(2);
public class RxEmitter<T> implements ObservableOnSubscribe<T>, ObservableEmitter<T> {
ObservableEmitter<T> e;
@Override
public void subscribe(ObservableEmitter<T> e) throws Exception {
this.e = e;
}
@Override
public void onNext(T value) {
e.onNext(value);
}
@Override
public void onError(Throwable error) {
e.onError(error);
}
@Override
public void onComplete() {
e.onComplete();
}
@Override
public void setDisposable(Disposable d) {
e.setDisposable(d);
}
@Override
public void setCancellable(Cancellable c) {
e.setCancellable(c);
}
@Override
public boolean isDisposed() {
return e.isDisposed();
}
@Override
public ObservableEmitter<T> serialize() {
return e.serialize();
}
@Override
public boolean tryOnError(Throwable t) {
return e.tryOnError(t);
}
}


参考文档 & Thanks

链接1

链接2

链接3

链接4

链接5

链接6
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: