您的位置:首页 > 理论基础 > 计算机网络

模仿retrofit将okhttp和rxjava进行整合

2016-10-17 15:54 337 查看
      最近学习Rxjava发现越用越好用,别的地方不表,主要是线程切换是我最中意的地方,另外非常灵活。刚开始接触Rxjava时接触的第一个操作符就是create,当时只知道这个方法用于构造一个基本的Observable也没多在意。后来随着学习的深入,发现越简单的东西其实用着越方便。create的主要作用是申明一个observable,在call方法中实现要做的操作,并在具体的位置调用onError,onCompleted,onNext。然后我又接触了retrofit这个框架,它是对okhttp的封装,其使用也是相当方便的,但是我发现它依然没有上传/下载文件的进度回调,于是我便产生了是否可以自己来写一个类似于retrofit一样的东西来练练手。想着很难,其实后来实现了发现蛮简单的,当然了没有retrofit那么精细,但是基本原理弄懂了就好。下面我们就开始动手吧。

首先来了解一下create的用法

create()方法给开发者从零开始创建一个Observable的能力。它使用一个OnSubscribe的参数对象,这个OnSubscribe继承于Action1,并且在有观察者订阅我们的Observable时执行call方法。

Observable.create(new Observable.OnSubcribe<Object>()){
@Obverride
public void call(Subscriber<? super Object> subscriber){
}
});
Observable 使用subscirber变量并根据不同条件来调用它的方法来和Observer通信。我们来看一个实例:
Observable<Integer> observablestring=Observable.create(new
Observable.OnSubscribe<Integer>(){
@Override
public void call(Subscriber<? super Integer> observer){
for(int i=0;i<5;i++){
observer.onNext();
}
observer.onCompleted();
}
});

      这个例子是rxjava essential里面的一个入门实例。我们创建一个新的Observable项,它执行一个产生5个元素的for循环,然后一个个的发射(onNext)这些元素,然后完成(onCompleted)发射。怎么样?够简单吧,下面我们就是要用这个东东加上okhttp来实现一个上传下载进度监听的例子。如果你对okhttp很了解可能会说进度监听很简单啊,这有什么了不起的,但是相信大家使用okhttp时都会遇到一个问题,那就是回调都是在一个子线程中,如果你想对组件进行操作,比如进度条等都是非常麻烦的事情。下面我们就开始用Rxjava对okhttp进行封装来实现一个山寨版的retrofit。注意?并没有用到注解!!!

okhttp的请求方式分为两种一种同步请求execute,另一种异步请求enqueue。首先,我们用同步的方式来进行封装代码如下;

Observable.create(new Observable.OnSubscribe<Response>() {
@Override
public void call(Subscriber<? super Response> subscriber) {
try{
subscriber.onNext(mOkHttpClient.newCall(builder.build()).execute());
}catch (Exception e){
e.printStackTrace();
subscriber.onError(e);
}
}
});
大家可以看到我们创建了一个新的Observable并且在call方法中调用了subscriber的onNext方法。因为execute()方法返回的是一个Response响应,所以这里我们返回的Observable也是Response类型的。接下来你只需要对Observable进行Rxjava操作就可以了。怎么样?是不是觉得其实并没有那么难,经过这样一次封装,再加上你对Rxjava有了一定了解的话,如何使用相信都已经不在话下了。

那么如何对下载文件进度进行监听呢?我们知道okhttp提供了一个拦截器可以实现这个功能,代码如下:

public Observable<String> downloadProgress(String url, final String srcFile){
final Request.Builder builder = new Request.Builder();
builder.url(DealUrl(url)).get();
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(final Subscriber<? super String> subscriber) {
mOkHttpClient.newBuilder().addNetworkInterceptor(new Interceptor() {
@Override
public Response intercept(Chain chain) throws IOException {
Response originalResponse = chain.proceed(chain.request());
return originalResponse.newBuilder()
.body(new ProgressResponseBody(new ProgressListener() {
@Override
public void update(long bytesRead, long contentLength, boolean done) {
subscriber.onNext(String.valueOf(100*bytesRead/contentLength));
}
}, originalResponse.body())).build();
}}).build().newCall(builder.build()).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {

}

@Override
public void onResponse(Call call, Response response) throws IOException {
InputStream is = null;
byte[] buf = new byte[2048];
int len = 0;
FileOutputStream fos = null;
File desFile = new File(srcFile);
if (!desFile.getParentFile().exists()) {
desFile.getParentFile().mkdirs();
}
try {
is = response.body().byteStream();
if (!desFile.exists()) {
Boolean creat = desFile.createNewFile();
} else {
desFile.delete();
}
fos = new FileOutputStream(desFile);
while ((len = is.read(buf)) != -1) {
fos.write(buf, 0, len);
}
fos.flush();
} catch (final IOException e) {
e.printStackTrace();
<span style="color:#ff0000;">subscriber.onError(e);</span>
} finally {
try {
if (is != null) is.close();
} catch (IOException e) {
e.printStackTrace();
}
try {
if (fos != null) fos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
});
}
});
}

代码解析:

1. 我们传入两个参数,一个是文件的下载地址,另一个是保存文件的路径。

2. 首先通过request的builder构建get请求。然后我们来创造一个Observable,其返回的的类型为String类型,代表的就是我们要观测的下载进度。

3. 接下来我们在call中调用okhttp的newBuilder()方法,这个方法跟okhttp3之前的copy方法一样,生成了一个新的client对象,它与我们定义好的client有着一模一样的属性,不       同的是它是新生成的,只使用一次。

4. 接着我们通过addNetworkInterceptor添加拦截器,这里其实还有一个addInterceptor也可以添加拦截器,他们的效果是一样的。在拦截器中我们对响应体进行了封装                      ProgressResponseBody,并传入了我们的进度回调。

5. 最后我们调用的是异步的enqueue方法,在里面将获取的数据流保存到指定的文件中。

这样我们就实现了对下载进度的监听,上传进度的监听类似,这里就不多说了。最后给出ProgressResponseBody类和进度接口类。

public class ProgressResponseBody extends ResponseBody{

private ResponseBody responseBody;
private ProgressListener progressListener;
private BufferedSource bufferedSource;

public ProgressResponseBody(ProgressListener progressListener, ResponseBody responseBody) {
this.progressListener = progressListener;
this.responseBody = responseBody;
}

@Override public MediaType contentType() {
return responseBody.contentType();
}

@Override public long contentLength() {
return responseBody.contentLength();
}

@Override public BufferedSource source() {
if (bufferedSource == null) {
bufferedSource = Okio.buffer(source(responseBody.source()));
}
return bufferedSource;
}

private Source source(Source source) {
return new ForwardingSource(source) {
long totalBytesRead = 0L;

@Override public long read(Buffer sink, long byteCount) throws IOException {
long bytesRead = super.read(sink, byteCount);
// read() returns the number of bytes read, or -1 if this source is exhausted.
totalBytesRead += bytesRead != -1 ? bytesRead : 0;
if(progressListener != null && responseBody.contentLength() != -1){
progressListener.update(totalBytesRead, responseBody.contentLength(), bytesRead == -1);
}
return bytesRead;
}
};
}
}


接口类

public interface ProgressListener{

void update(long bytesRead, long contentLength, boolean done);

}

源码链接地址
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: