您的位置:首页 > 编程语言 > Java开发

RxJava操作符总结之过滤

2017-07-28 11:31 330 查看

RxJava操作符总结之过滤

jsut()

just(T t1, T t2, T t3....)
,just能够传入多个同样类型的參数,并将当前參数一个接着一个的发送。

Observable.just("1","2","3")
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});


1
2
3


repeat()

repeat()
将当前的消息序列无限制循环发送。我们能够传入一个參数表示循环的次数

Observable.just("1","2","3")
.repeat(3)
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});


123123123


defer()

延迟创建
Observable


再订阅时创建
Observable
对象。该方法利用
call
方法的特性。

public static void main(String[] args) {

Observable.defer(new Func0<Observable<Integer>>() {
@Override
public Observable<Integer> call() {
return getInt();
}
});

}

public static Observable<Integer> getInt() {

System.out.println("getInt()");
return Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<?

super Integer> subscriber) {

System.out.print("ss");
subscriber.onNext(42);
}
});
}


此时
getInt()
方法不会被调用,会在
subscribe()
时调用。这个假设看过源代码非常easy理解。或者看我之前的博客RxJava 源代码走读之Observable.create()和subscribe()

range()

从指定数字開始发射数字。

Observable.range(3,2)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print(integer);
}
});


34


range(int start,int count)
第一个參数为从哪个数開始,第二个參数为发送多少个。

filter()

过滤作用。依据回调的条件对序列进行筛选。

查询0~49能被3整除的数。

private static ArrayList<Integer> array = new ArrayList<>();

public static void main(String[] args) {
init();
Observable.from(array)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {

//推断条件,假设返回false则该发送内容将取消,true将继续发送
return integer%3==0;
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print(integer+" ");
}
});

}

public static void init(){
for (int i=0;i<50;i++){
array.add(i);
}
}


0 36 9 12 15 18 21 24 27 30 3336 39 42 45 48


take() takeLast()

take()
获取发射序列的前几个。后面的取消发送。

takeLast()
获取发射序列的后几个。其余的取消继续向下发送

获取0~49的前三个数和最后三个数

Observable.from(array)
.take(3)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print(integer+" ");
}
});

System.out.println();
Observable.from(array)
.takeLast(3)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print(integer+" ");
}
});


0 1 2
47 4849


distinct()

将发送序列中反复的值除去。即发送序列后面的值假设和前面有重叠,则后面的值不会被发送。 该方法去重时须要记录发送序列每一次发送的值。所以当有大数据时要注意发送的值。

Observable.from(array)
.take(3)
.repeat(3)
.distinct()
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print(integer+" ");
}
});


0 1 2


distinctUntilChanged()

该方法和
distinct()
的差别为,当前发射值与上一次发射值同样时则取消当前发射,假设不同样,则继续发射。

即所谓的有改变时发射。



first()和last()

故名思意。就是获取发射序列的第一个和最后一个。

同一时候。该方法能够依据条件进行选择符合条件的第一个和最后一个。

获取0~49中3的倍数的最后一个值

Observable.from(array)
.last(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer%3==0&&integer!=0;
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print(integer+" ");
}
});


48


skip()和skipLast()

跳过发射序列的前几个和最后几个 。

该方法和
take()
,
takeLast()
相似。

跳过0~49发射序列中的前三个和后三个

Observable.from(array)
.skip(3)
.skipLast(3)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print(integer+" ");
}
});


34 5 6 7 8 9 10 11 12 1314 15 16 17 18 19 20 21 22 2324 25 26 27 28 29 30 31 32 3334 35 36 37 38 39 40 41 42 4344 45 46


elementAt()和elementAtOrDefault()

获取发射序列指定位置的发射值。

当中当我们指定位置大于发射序列时,会抛出异常。所以推荐使用带有默认值的
elementAtDefault()


对0~49的发射序列,获取前三个元素的发射后获取第五个位置的元素值。假设没有,则设置默认值为3.

Observable.from(array)
.take(3)
.elementAtOrDefault(5,3)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print(integer+" ");
}
});


3


interval()

轮询。该操作符每隔指定时间发送一次事件。

该方法默认在
conmputation
线程执行

Observable
.interval(3, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Long>() {
@Override
public void call(Long s) {
Log.i("info",s);
}
}
);


第一个參数:延时时间 第二个參数:单位

该操作符会从0開始。每隔1秒发送一次

略微复杂点的,对于列表。我们要遍历打印此列表,则代码例如以下

Observable.interval(3,TimeUnit.SECONDS)
.flatMap(new Func1<Long, Observable<String>>() {
@Override
public Observable<String> call(Long aLong) {
return Observable.just(array.get(aLong.intValue()));
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.i("info","onCompleted");
}

@Override
public void onError(Throwable e) {
Log.i("info","error");
}

@Override
public void onNext(String s) {
Log.i("info","onNext--"+s);
}
});


04-1315:44:28.634 15455-15455/mahao.alex.rxjava I/info: onNext--aa
04-1315:44:31.634 15455-15455/mahao.alex.rxjava I/info: onNext--bb
04-1315:44:34.634 15455-15455/mahao.alex.rxjava I/info: onNext--cc
04-1315:44:37.634 15455-15455/mahao.alex.rxjava I/info: onNext--dd
04-1315:44:40.644 15455-15455/mahao.alex.rxjava I/info: error


打印例如以下,并且是每隔三秒。

打印一次。。

timer()

延迟固定时间后发送元素。

interval()
差别为该操作符仅仅发送一次。

Observable.timer(3,TimeUnit.SECONDS)
.flatMap(new Func1<Long, Observable<String>>() {
@Override
public Observable<String> call(Long aLong) {
return Observable.just(array.get(aLong.intValue()));
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.i("info","onCompleted");
}

@Override
public void onError(Throwable e) {
Log.i("info","error");
}

@Override
public void onNext(String s) {
Log.i("info","onNext--"+s);
}
});


04-1315:52:32.114 23036-23036/mahao.alex.rxjava I/info: onNext--aa
04-1315:52:32.114 23036-23036/mahao.alex.rxjava I/info: onCompleted


注意:该操作符执行在
conputation
线程中。

sample()

将发射序列每隔固定间隔获取其近期值并向下发送。

这个分为两种情况。

发送序列的时间间隔大于
sample
的时间间隔

发送序列的时间间隔小于
sample
的时间间隔

对于另外一种情况,就是每隔固定间隔发射就可以。而第一种情况存在的一种特殊情况

以下我们看一下样例

有一个数组{“aa”,”bb”,”cc”,”dd”}每隔三秒发射,而
sample
每隔两秒筛选。

Observable.interval(3,TimeUnit.SECONDS)
.flatMap(new Func1<Long, Observable<String>>() {
@Override
public Observable<String> call(Long aLong) {
return Observable.just(array.get(aLong.intValue()));
}
})
.sample(2,TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.i("info","onCompleted");
}

@Override
public void onError(Throwable e) {
Log.i("info","error");
}

@Override
public void onNext(String s) {
Log.i("info","onNext--"+s);
}
});


04-1316:13:45.404 11935-11935/mahao.alex.rxjava I/info: onNext--aa
04-1316:13:49.404 11935-11935/mahao.alex.rxjava I/info: onNext--bb
04-1316:13:51.404 11935-11935/mahao.alex.rxjava I/info: onNext--cc
04-1316:13:55.404 11935-11935/mahao.alex.rxjava I/info: onNext--dd
04-1316:13:56.414 11935-11935/mahao.alex.rxjava I/info: error


看一下他们的事件间隔。四次发射的时间间隔为 4,2,4。最后error暂且不提。

为什么是这个时间间隔呢?

图尽管丑,但还是有一定道理的



再上一张好看的图



timeOut

指定最小的发射时间间隔,假设指定的当前时间间隔内没有发送元素。则抛出异常,停止。

debounce

当发送的数据的时间间隔小于
debounce
指定的时间间隔,则当前发送的数据将被过滤,假设在指定的时间间隔内仍没有数据发送,则会发送最后一个。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: