您的位置:首页 > 编程语言 > Java开发

RxJava系列3:RxJava操作符-创建操作符

2017-12-20 18:22 393 查看

create

完整创建1个被观察者对象(Observable)。

需求场景:

快速的创建被观察者对象。

栗子:

Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onCompleted();
}

}).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Integer integer) {

}
});

Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onCompleted();
}

}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {

}
});


just

快速创建1个被观察者对象(Observable)

发送事件的特点:直接发送 传入的事件

注:最多只能发送10个参数

应用场景

快速创建 被观察者对象(Observable) & 发送10个以下事件

栗子:

rx.Observable.just(1, 2, 4)
.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
Log.d(":...nht", "onCompleted");
}

@Override
public void onError(Throwable e) {
Log.d(":...nht", "onError");
}

@Override
public void onNext(Integer integer) {
Log.d(":...nht", integer + "");
}
});

12-20 09:05:54.907 8873-8873/com.sankuai.moviepro D/:...nht: 1
12-20 09:05:54.907 8873-8873/com.sankuai.moviepro D/:...nht: 2
12-20 09:05:54.907 8873-8873/com.sankuai.moviepro D/:...nht: 4
12-20 09:05:54.907 8873-8873/com.sankuai.moviepro D/:...nht: onCompleted


defer

作用

直到有观察者(Observer )订阅时,才动态创建被观察者对象(Observable) & 发送事件

通过 Observable工厂方法创建被观察者对象(Observable)

每次订阅后,都会得到一个刚创建的最新的Observable对象,这可以确保Observable对象里的数据是最新的

应用场景

动态创建被观察者对象(Observable) & 获取最新的Observable对象数据

栗子:

Observable<Integer> observable = Observable.defer(new Func0<Observable<Integer>>() {
@Override
public Observable<Integer> call() {
if (flag) {
return Observable.just(1,2,3);
} else {
return Observable.just(6,7,8);
}
}
});

flag = false;
observable.subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Integer integer) {
Log.d(":...nht", "接收到的整数是" + integer);
}
});

12-20 09:27:56.112 9806-9806/com.sankuai.moviepro D/:...nht: 接收到的整数是6
12-20 09:27:56.112 9806-9806/com.sankuai.moviepro D/:...nht: 接收到的整数是7
12-20 09:27:56.112 9806-9806/com.sankuai.moviepro D/:...nht: 接收到的整数是8

分析:
只有在订阅时才会创建中心对象(被观察者对象),所以这个时候flag已经变为false了。所以输出的值如上。


timer

作用

快速创建1个被观察者对象(Observable)

发送事件的特点:延迟指定时间后,发送1个数值0(Long类型)

本质 = 延迟指定时间后,调用一次 onNext(0)

应用场景

延迟指定事件,发送一个0,一般用于检测

栗子:

Observable.timer(2, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Long aLong) {
Log.d(":..nht", "接收到了事件"+ aLong  );
}
});

12-20 09:38:30.804 10617-10812/com.sankuai.moviepro D/:..nht: 接收到了事件0
注意这个只调用一次


interval

作用

快速创建1个被观察者对象(Observable)

发送事件的特点:每隔指定时间 就发送 事件

发送的事件序列 = 从0开始、无限递增1的的整数序列

栗子:

// 参数说明:
// 参数1 = 第1次延迟时间;
// 参数2 = 间隔时间数字;
// 参数3 = 时间单位;
Observable.interval(3, 1, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(Long aLong) {
Log.d(":..nht", "接收到了事件"+ aLong  );
}
});

12-20 09:48:39.376 11109-11303/com.sankuai.moviepro D/:..nht: 接收到了事件0
12-20 09:48:40.380 11109-11303/com.sankuai.moviepro D/:..nht: 接收到了事件1
12-20 09:48:41.380 11109-11303/com.sankuai.moviepro D/:..nht: 接收到了事件2
12-20 09:48:42.380 11109-11303/com.sankuai.moviepro D/:..nht: 接收到了事件3
12-20 09:48:43.380 11109-11303/com.sankuai.moviepro D/:..nht: 接收到了事件4
12-20 09:48:44.380 11109-11303/com.sankuai.moviepro D/:..nht: 接收到了事件5
12-20 09:48:45.376 11109-11303/com.sankuai.moviepro D/:..nht: 接收到了事件6


intervalRange

作用

快速创建1个被观察者对象(Observable)

发送事件的特点:每隔指定时间 就发送 事件,可指定发送的数据的数量

a. 发送的事件序列 = 从0开始、无限递增1的的整数序列

b. 作用类似于interval(),但可指定发送的数据的数量

// 参数说明:
// 参数1 = 事件序列起始点;
// 参数2 = 事件数量;
// 参数3 = 第1次事件延迟发送时间;
// 参数4 = 间隔时间数字;
// 参数5 = 时间单位
Observable.intervalRange(3,10,2, 1, TimeUnit.SECONDS)
// 该例子发送的事件序列特点:
// 1. 从3开始,一共发送10个事件;
// 2. 第1次延迟2s发送,之后每隔2秒产生1个数字(从0开始递增1,无限个)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
// 默认最先调用复写的 onSubscribe()

@Override
public void onNext(Long value) {
Log.d(TAG, "接收到了事件"+ value  );
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}

@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}

});


range

作用

快速创建1个被观察者对象(Observable)

发送事件的特点:连续发送 1个事件序列,可指定范围

a. 发送的事件序列 = 从0开始、无限递增1的的整数序列

b. 作用类似于intervalRange(),但区别在于:无延迟发送事件

// 参数说明:
// 参数1 = 事件序列起始点;
// 参数2 = 事件数量;
// 注:若设置为负数,则会抛出异常
Observable.range(3,10)
// 该例子发送的事件序列特点:从3开始发送,每次发送事件递增1,一共发送10个事件
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
// 默认最先调用复写的 onSubscribe()

@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value  );
}

@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}

@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}

});


from

Javadoc: from(array)

Javadoc: from(Iterable)

Javadoc: from(Future)

Javadoc: from(Future,Scheduler)

Javadoc: from(Future,timeout, timeUnit)

将一个Iterable, 一个Future, 或者一个数组转换成一个Observable。

在RxJava中,from操作符可以转换Future、Iterable和数组。对于Iterable和数组,产生的Observable会发射Iterable或数组的每一项数据

对于Future,它会发射Future.get()方法返回的单个数据。from方法有一个可接受两个可选参数的版本,分别指定超时时长和时间单位。如果过了指定的时长Future还没有返回一个值,这个Observable会发射错误通知并终止。

from默认不在任何特定的调度器上执行。然而你可以将Scheduler作为可选的第二个参数传递给Observable,它会在那个调度器上管理这个Future。

rangeLong,类似range

Empty/Never/Throw

一般用于测试使用

<– empty() –>

// 该方法创建的被观察者对象发送事件的特点:仅发送Complete事件,直接通知完成

Observable observable1=Observable.empty();

// 即观察者接收后会直接调用onCompleted()

<– error() –>

// 该方法创建的被观察者对象发送事件的特点:仅发送Error事件,直接通知异常

// 可自定义异常

Observable observable2=Observable.error(new RuntimeException())

// 即观察者接收后会直接调用onError()

<– never() –>

// 该方法创建的被观察者对象发送事件的特点:不发送任何事件

Observable observable3=Observable.never();

// 即观察者接收后什么都不调用

参考资料

Android RxJava:最基础的操作符详解 - 创建操作符

https://www.jianshu.com/p/e19f8ed863b1

以上这个博主写了一系列的rxjava,通俗易懂,且算是比较全面了。

延迟创建

定时操作:在经过了x秒后,需要自动执行y操作

周期性操作:每隔x秒后,需要自动执行y操作

RxJava图示如何理解:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: