您的位置:首页 > 编程语言 > Java开发

RxJava操作符记录

2017-12-08 14:14 232 查看
1throttleWithTimeOut限流,过滤
public void throttleWithTimeout() {
Subscription subscribe = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 10; i++) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(i);
}
int sleep = 100;
if (i % 3 == 0) {
sleep = 300;
}
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
e.printStackTrace();
}

}
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.computation())
.throttleWithTimeout(200, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(i -> logger("throttleWithTimeout:" + i));
addSubscription(subscribe);


打印结果
throttleWithTimeout:0 
throttleWithTimeout:3 
throttleWithTimeout:6 
throttleWithTimeout:9

分析
结果分析:每隔100毫秒发射一个数据,当要发射的数据是3的倍数的时候,下一个数据就延迟到300毫秒再发射 
即:0 -300ms-> 1 -100ms-> 2 -100ms-> 3 .. 
设定过滤时间为200ms,则1,2都被过滤丢弃。

2.deounce

public void debounce() {
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9).debounce(integer -> {
return Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
//如果%2==0,则发射数据并调用了onCompleted结束,则不会被丢弃
if (integer % 2 == 0 && !subscriber.isUnsubscribed()) {
subscriber.onNext(integer);
subscriber.onCompleted();
}
}
});
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
logger("debounce:" + integer);
}
});
}


打印结果
debounce:2 
debounce:4 
debounce:6 
debounce:8 
debounce:9

由结果可知,9的打印证明默认调用了onCompleted


3.Distinct 去重

public void distinct(){
Observable.just(1, 2, 1, 1, 2, 3)
.distinct()
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
logger("Next: " + item);
}

@Override
public void onError(Throwable error) {
logger("Error: " + error.getMessage());
}

@Override
public void onCompleted() {
logger("Sequence complete.");
}
});
}



4.ElementAt 返回指定位置的数据

public void elementAt(){
Observable.just(0, 1, 2, 3, 4, 5).elementAt(2)
.subscribe(i -> logger("elementAt:" + i));
}



5.Filter 

public void filter() {
Observable.just(1, 2, 3, 4, 5)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer item) {
return (item < 4);
}
}).subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
logger("Next: " + item);
}

@Override
public void onError(Throwable error) {
logger("Error: " + error.getMessage());
}

@Override
public void onCompleted() {
logger("Sequence complete.");
}
});
}


6.Skip 过滤前x项

Observable.just(0,
1,
2,
3,
4,
5).skip(2).subscribe(i
-> logger("Skip:"
+ i));


7.Take 只取前几个

Observable.just(0,
1,
2,
3,
4,
5).take(2).subscribe(i
-> logger("Take:"
+ i));

TakeLast:发射Observable发射的最后N项数据, 

takeLastBuffer:最后N项数据收集到list再发射

SkipLast:忽略Observable’发射的后N项数据,只保留前面的数据。 

skipLast操作符提交满足条件的结果给订阅者存在延迟效果


8Sample

createObserver().sample(1000,
TimeUnit.MILLISECONDS).subscribe(i -> logger("sample:"
+ i));

sample:3 (为啥是3开始,还没理解)
sample:8 
sample:13 
sample:18 


9ThrottleFirst

createObserver().throttleFirst(1000,
TimeUnit.MILLISECONDS).subscribe(i -> logger("throttleFirst:"
+ i));

throttleFirst:0 
throttleFirst:5 
throttleFirst:10 
throttleFirst:15

其中sample操作符会每隔5个数字发射出一个数据来, 
而throttleFirst则会每隔5个数据发射第一个数据。


10.ThrottleFirst 与RxBinding结合

防止重复点击触发

RxView.clicks(btnClick)
.throttleFirst(1, TimeUnit.SECONDS)
.subscribe(new Action1<Void>() {
@Override
public void call(Void aVoid) {
Toast.makeText(RxBindingButtonClick.this,"Click",Toast.LENGTH_SHORT).show();
}
});



11.ignoreElements

ignoreElements操作符忽略所有源Observable产生的结果,只把Observable的onCompleted和onError事件通知给订阅者。 
ignoreElements操作符适用于不太关心Observable产生的结果,只是在Observable结束时(onCompleted)或者出现错误时能够收到通知。

public void ignoreElements(){
Observable.just(1,2,3,4,5,6,7,8).ignoreElements()
.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
logger("Next: " + item);
}

@Override
public void onError(Throwable error) {
logger("Error: " + error.getMessage());
}

@Override
public void onCompleted() {
logger("Sequence complete.");
}
});
}


Sequence complete.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  rxjava