您的位置:首页 > 编程语言 > Java开发

RxJava 2.0的基本使用

2017-03-09 15:27 381 查看
之前在android项目中使用的是RxJava 1.x和RxAndroid,结合Retrofit处理网络请求,比起Handler, AsyncTask确实开发起来方便很多,架构也比较清晰。

RxJava 2.0新版出来已经有段时间了,也计划着更新一下项目。

通过JetBrain IntelliJ IDEA创建一个Gradle工程。

加入RxJava的依赖

sourceCompatibility = 1.8

repositories {
//mavenLocal()
//maven{ url 'http://maven.aliyun.com/nexus/content/groups/public/'}
jcenter()
}

dependencies {
compile "io.reactivex.rxjava2:rxjava:2.0.7"
testCompile group: 'junit', name: 'junit', version: '4.11'
}


首先使用最普通的模式

//定义被观察者
//RxJava 2.0 保留了Observable, 不支持背压
Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
// emitter  发射器

e.onNext("hello");
e.onNext("world");
e.onComplete();
}
});

//定义观察者
Observer<String> observer = new Observer<String>(){
@Override
public void onSubscribe(Disposable d) {
// disposable 一次性的
System.out.println("onSubscribe");
}

@Override
public void onNext(String s) {
System.out.println("onNext: " + s);
}

@Override
public void onError(Throwable e) {
System.out.println("onError");
}

@Override
public void onComplete() {
System.out.println("onComplete");
}
};

// 建立被观察者和观察者之间的订阅关系
observable.subscribe(observer);


接着使用支持背压的模式

//定义被观察者 Flowable支持背压
Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> e) throws Exception {
e.onNext("hello");
e.onNext("world");
e.onComplete();
}
}, BackpressureStrategy.BUFFER);

//定义观察者
Subscriber subscriber = new Subscriber<String>(){
@Override
public void onSubscribe(Subscription s) {
System.out.println("onSubscribe");
//React Pull
s.request(Long.MAX_VALUE);
}

@Override
public void onNext(String s) {
System.out.println("onNext: " + s);
}

@Override
public void onError(Throwable t) {
System.out.println("onError");
}

@Override
public void onComplete() {
System.out.println("onComplete");
}
};

// 建立被观察者和观察者之间的订阅关系
flowable.subscribe(subscriber);


一种简化写法

String[] s = {"Hello", "world"};

Flowable.fromArray(s)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
});


更外一种更简单的写法

Flowable.fromArray(s).subscribe(System.out::println);


参考资料

背压问题说明
https://github.com/ReactiveX/RxJava/wiki/Backpressure
官方网站
https://github.com/ReactiveX/RxJava http://www.vogella.com/tutorials/RxJava/article.html#rxjava-observable-types
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: