模仿retrofit将okhttp和rxjava进行整合
2016-10-17 15:54
337 查看
最近学习Rxjava发现越用越好用,别的地方不表,主要是线程切换是我最中意的地方,另外非常灵活。刚开始接触Rxjava时接触的第一个操作符就是create,当时只知道这个方法用于构造一个基本的Observable也没多在意。后来随着学习的深入,发现越简单的东西其实用着越方便。create的主要作用是申明一个observable,在call方法中实现要做的操作,并在具体的位置调用onError,onCompleted,onNext。然后我又接触了retrofit这个框架,它是对okhttp的封装,其使用也是相当方便的,但是我发现它依然没有上传/下载文件的进度回调,于是我便产生了是否可以自己来写一个类似于retrofit一样的东西来练练手。想着很难,其实后来实现了发现蛮简单的,当然了没有retrofit那么精细,但是基本原理弄懂了就好。下面我们就开始动手吧。
首先来了解一下create的用法
create()方法给开发者从零开始创建一个Observable的能力。它使用一个OnSubscribe的参数对象,这个OnSubscribe继承于Action1,并且在有观察者订阅我们的Observable时执行call方法。
这个例子是rxjava essential里面的一个入门实例。我们创建一个新的Observable项,它执行一个产生5个元素的for循环,然后一个个的发射(onNext)这些元素,然后完成(onCompleted)发射。怎么样?够简单吧,下面我们就是要用这个东东加上okhttp来实现一个上传下载进度监听的例子。如果你对okhttp很了解可能会说进度监听很简单啊,这有什么了不起的,但是相信大家使用okhttp时都会遇到一个问题,那就是回调都是在一个子线程中,如果你想对组件进行操作,比如进度条等都是非常麻烦的事情。下面我们就开始用Rxjava对okhttp进行封装来实现一个山寨版的retrofit。注意?并没有用到注解!!!
okhttp的请求方式分为两种一种同步请求execute,另一种异步请求enqueue。首先,我们用同步的方式来进行封装代码如下;
那么如何对下载文件进度进行监听呢?我们知道okhttp提供了一个拦截器可以实现这个功能,代码如下:
代码解析:
1. 我们传入两个参数,一个是文件的下载地址,另一个是保存文件的路径。
2. 首先通过request的builder构建get请求。然后我们来创造一个Observable,其返回的的类型为String类型,代表的就是我们要观测的下载进度。
3. 接下来我们在call中调用okhttp的newBuilder()方法,这个方法跟okhttp3之前的copy方法一样,生成了一个新的client对象,它与我们定义好的client有着一模一样的属性,不 同的是它是新生成的,只使用一次。
4. 接着我们通过addNetworkInterceptor添加拦截器,这里其实还有一个addInterceptor也可以添加拦截器,他们的效果是一样的。在拦截器中我们对响应体进行了封装 ProgressResponseBody,并传入了我们的进度回调。
5. 最后我们调用的是异步的enqueue方法,在里面将获取的数据流保存到指定的文件中。
这样我们就实现了对下载进度的监听,上传进度的监听类似,这里就不多说了。最后给出ProgressResponseBody类和进度接口类。
接口类
源码链接地址
首先来了解一下create的用法
create()方法给开发者从零开始创建一个Observable的能力。它使用一个OnSubscribe的参数对象,这个OnSubscribe继承于Action1,并且在有观察者订阅我们的Observable时执行call方法。
Observable.create(new Observable.OnSubcribe<Object>()){ @Obverride public void call(Subscriber<? super Object> subscriber){ } });Observable 使用subscirber变量并根据不同条件来调用它的方法来和Observer通信。我们来看一个实例:
Observable<Integer> observablestring=Observable.create(new Observable.OnSubscribe<Integer>(){ @Override public void call(Subscriber<? super Integer> observer){ for(int i=0;i<5;i++){ observer.onNext(); } observer.onCompleted(); } });
这个例子是rxjava essential里面的一个入门实例。我们创建一个新的Observable项,它执行一个产生5个元素的for循环,然后一个个的发射(onNext)这些元素,然后完成(onCompleted)发射。怎么样?够简单吧,下面我们就是要用这个东东加上okhttp来实现一个上传下载进度监听的例子。如果你对okhttp很了解可能会说进度监听很简单啊,这有什么了不起的,但是相信大家使用okhttp时都会遇到一个问题,那就是回调都是在一个子线程中,如果你想对组件进行操作,比如进度条等都是非常麻烦的事情。下面我们就开始用Rxjava对okhttp进行封装来实现一个山寨版的retrofit。注意?并没有用到注解!!!
okhttp的请求方式分为两种一种同步请求execute,另一种异步请求enqueue。首先,我们用同步的方式来进行封装代码如下;
Observable.create(new Observable.OnSubscribe<Response>() { @Override public void call(Subscriber<? super Response> subscriber) { try{ subscriber.onNext(mOkHttpClient.newCall(builder.build()).execute()); }catch (Exception e){ e.printStackTrace(); subscriber.onError(e); } } });大家可以看到我们创建了一个新的Observable并且在call方法中调用了subscriber的onNext方法。因为execute()方法返回的是一个Response响应,所以这里我们返回的Observable也是Response类型的。接下来你只需要对Observable进行Rxjava操作就可以了。怎么样?是不是觉得其实并没有那么难,经过这样一次封装,再加上你对Rxjava有了一定了解的话,如何使用相信都已经不在话下了。
那么如何对下载文件进度进行监听呢?我们知道okhttp提供了一个拦截器可以实现这个功能,代码如下:
public Observable<String> downloadProgress(String url, final String srcFile){ final Request.Builder builder = new Request.Builder(); builder.url(DealUrl(url)).get(); return Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(final Subscriber<? super String> subscriber) { mOkHttpClient.newBuilder().addNetworkInterceptor(new Interceptor() { @Override public Response intercept(Chain chain) throws IOException { Response originalResponse = chain.proceed(chain.request()); return originalResponse.newBuilder() .body(new ProgressResponseBody(new ProgressListener() { @Override public void update(long bytesRead, long contentLength, boolean done) { subscriber.onNext(String.valueOf(100*bytesRead/contentLength)); } }, originalResponse.body())).build(); }}).build().newCall(builder.build()).enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { } @Override public void onResponse(Call call, Response response) throws IOException { InputStream is = null; byte[] buf = new byte[2048]; int len = 0; FileOutputStream fos = null; File desFile = new File(srcFile); if (!desFile.getParentFile().exists()) { desFile.getParentFile().mkdirs(); } try { is = response.body().byteStream(); if (!desFile.exists()) { Boolean creat = desFile.createNewFile(); } else { desFile.delete(); } fos = new FileOutputStream(desFile); while ((len = is.read(buf)) != -1) { fos.write(buf, 0, len); } fos.flush(); } catch (final IOException e) { e.printStackTrace(); <span style="color:#ff0000;">subscriber.onError(e);</span> } finally { try { if (is != null) is.close(); } catch (IOException e) { e.printStackTrace(); } try { if (fos != null) fos.close(); } catch (IOException e) { e.printStackTrace(); } } } }); } }); }
代码解析:
1. 我们传入两个参数,一个是文件的下载地址,另一个是保存文件的路径。
2. 首先通过request的builder构建get请求。然后我们来创造一个Observable,其返回的的类型为String类型,代表的就是我们要观测的下载进度。
3. 接下来我们在call中调用okhttp的newBuilder()方法,这个方法跟okhttp3之前的copy方法一样,生成了一个新的client对象,它与我们定义好的client有着一模一样的属性,不 同的是它是新生成的,只使用一次。
4. 接着我们通过addNetworkInterceptor添加拦截器,这里其实还有一个addInterceptor也可以添加拦截器,他们的效果是一样的。在拦截器中我们对响应体进行了封装 ProgressResponseBody,并传入了我们的进度回调。
5. 最后我们调用的是异步的enqueue方法,在里面将获取的数据流保存到指定的文件中。
这样我们就实现了对下载进度的监听,上传进度的监听类似,这里就不多说了。最后给出ProgressResponseBody类和进度接口类。
public class ProgressResponseBody extends ResponseBody{ private ResponseBody responseBody; private ProgressListener progressListener; private BufferedSource bufferedSource; public ProgressResponseBody(ProgressListener progressListener, ResponseBody responseBody) { this.progressListener = progressListener; this.responseBody = responseBody; } @Override public MediaType contentType() { return responseBody.contentType(); } @Override public long contentLength() { return responseBody.contentLength(); } @Override public BufferedSource source() { if (bufferedSource == null) { bufferedSource = Okio.buffer(source(responseBody.source())); } return bufferedSource; } private Source source(Source source) { return new ForwardingSource(source) { long totalBytesRead = 0L; @Override public long read(Buffer sink, long byteCount) throws IOException { long bytesRead = super.read(sink, byteCount); // read() returns the number of bytes read, or -1 if this source is exhausted. totalBytesRead += bytesRead != -1 ? bytesRead : 0; if(progressListener != null && responseBody.contentLength() != -1){ progressListener.update(totalBytesRead, responseBody.contentLength(), bytesRead == -1); } return bytesRead; } }; } }
接口类
public interface ProgressListener{ void update(long bytesRead, long contentLength, boolean done); }
源码链接地址
相关文章推荐
- RxJava+Retrofit+OkHttp深入浅出-mvp(使用篇)
- 浅谈RxJava+Retrofit+OkHttp 封装使用
- RxJava+Retrofit+OkHttp深入浅出-终极封装二(网络请求)
- retrofit+okhttp+Rxjava封装
- Retrofit2+OkHttp3+RxJava搭建网络框架
- rxjava+retrofit+okhttp
- Android下利用RxJava和Retrofit进行联网返回bean对象的库
- 网络请求篇--RXJava+okhttp 进行网络下载操作
- Retrofit、RxJava和OkHttp使用
- retrofit+okhttp+rxjava
- RxAndroid+RxJava+Gson+retrofit+okhttp初步搭建android网络请求框架
- Android Retrofit+rxjava+okhttp请求网络的基本用法
- RxJava+Retrofit+OkHttp+网络缓存
- Retrofit+RxJava+RxAndroid+OkHttp简单封装
- Retrofit 2.0,RxJava(Android), OkHttp3.3
- Android MVP架构(RxJava+SQLBrite+Retrofit+OkHttp+Glide)
- MVP和Retrofit+Rxjava+OkHttp封装结合请求数据
- Android开发相关——Retrofit+RxJava+OkHttp(上)闲扯
- retrofit+RxJava+okhttp简便封装实现网络请求(详解)