您的位置:首页 > 理论基础 > 计算机网络

Volley源码解析与Okhttp+Rxjava组合构建网络库

2016-07-31 16:41 447 查看
概述
 
 之前在讨论组里听到许多讨论okhttp的话题,可见okhttp是一个相对成熟的解决方案,看到android4.4后网络访问的源码中HttpURLConnection已经替换成okhttp实现了,所以当时考虑来支持下,最近项目中新版应用效果良好,所以抽空来谈谈,顺便记录下.Volley源码解析,Okhttp介绍,Rxjava处理异步调度(主要是针对一些网络响应后,需要对数据进行一些复杂操作,耗时在子线程,更新在主线程,所以补了一个异步机制).源码感兴趣的可以关注我github开源项目:https://github.com/gongjr/ScanData

1.使用 OkHttp 作为传输层的实现优点

支持SPDY(speedy,一种开放网络传输协议,用来发送网页内容,基于传输控制协议(TCP)的应用层协议,SPDY协议通过压缩、多路复用和优先级来缩短加载时间,SPDY当前并不是一个标准协议,但SPDY的开发组已经开始推动SPDY成为正式标准(现为互联网草案[3]),HTTP/2主要以SPDY技术为主),最主要一点允许连接同一主机的所有请求分享一个socket。
如果SPDY不可用,会使用连接池减少请求延迟。
使用GZIP压缩下载内容,且压缩操作对用户是透明的。
利用响应缓存来避免重复的网络请求。
当网络出现问题的时候,OKHttp会依然有效,它将从常见的连接问题当中恢复。
如果你的服务端有多个IP地址,当第一个地址连接失败时,OKHttp会尝试连接其他的地址,这对IPV4和IPV6以及寄宿在多个数据中心的服务而言,是非常有必要的,OkHttp还处理了代理服务器问题和SSL握手失败问题。

2.明确一点okhttp是与HttpClient,HttpUrlConnection一样作为网络传输层的实现

当前Android提供了这两种通信实现类,这两者都支持HTTPS,流的上传和下载,配置超时,IPv6和连接池,已经足够满足我们大多数Http请求需求,但更高效的使用HTTP可以让我们的应用运行更快、更节省流量,效率更高.在Android
2.2版本之前,HttpClient拥有较少的bug,因此使用它是最好的选择。而在Android 2.3版本及以后,HttpURLConnection则是最佳的选择。它的API简单,体积较小,因而非常适用于Android项目。压缩和缓存机制可以有效地减少网络访问的流量,在提升速度和省电方面也起到了较大的作用。官方应该是建议4.4后,采用Okhttp来实现,所以基于此认识,完善了当前的网络库.

3.改善思路

使用 OkHttp 无需重写您程序中的网络代码。OkHttp实现了几乎和java.net.HttpURLConnection一样的API。如果用了 Apache
HttpClient,则OkHttp也提供了一个对应的okhttp-apache 模块, OkHttp使用起来不如Volley方便,缓存机制,请求队列管理策略,异步回调,重试机制等,OkHttp的回调都是在工作线程,所以如果在回调里面操作View的话,需要自己转换到UI线程,非常繁琐,所以需要封装.
Retrofit 2.0后默认使用的是OkHttp,2.0之前是可选的,本来打算用RxJava+Retrofit,当时因为这边从早期都是基于volley基础来实现的,包括后面根据项目需求,对volley进行了二次封装,由于熟悉volley的源码,底层设计,所以继续基于当前库进行扩展,顺便打算加入RxJava进行响应异步调度



上面是Volley的设计图,相信使用过的都不陌生.RequestQueue类作为volley的核心类,表示请求队列,里面包含一个CacheDispatcher(用于处理走缓存请求的调度线程)、NetworkDispatcher数组(用于处理走网络请求的调度线程),一个ResponseDelivery(返回结果分发接口),通过
start() 函数启动时会启动CacheDispatcher和NetworkDispatchers,已及对应的请求队列管理.
HttpStack接口:处理 Http 请求,返回请求结果。目前 Volley 中有基于 HttpURLConnection 的HurlStack和 基于 Apache HttpClient
的HttpClientStack,我们基于okhttp实现了OkHttpStack。

Network接口:调用HttpStack处理请求,并将结果转换为可被ResponseDelivery处理的NetworkResponse。

Cache接口:缓存请求结果,Volley 默认使用的是基于 sdcard 的DiskBasedCache。NetworkDispatcher得到请求结果后判断是否需要存储在 Cache,CacheDispatcher会从 Cache 中取缓存结果。
Volley 中大多数是基于接口的设计,可配置性强,扩展性高,我们可以基于自己业务需求扩展,比如继承Request接口实现,ResultMapRequest<T>当前项目架构下在用的,接口文档实体类规范的模式,泛型传入根据接口文档生成实体类型,直接在request中,将返回json数据,通过Gson转化为对应接口文档实体类抛出,那么网络接口的维护,就完全是接口文档与接口文档实体的维护,更加简洁方便了,之前文章已经介绍过了,就不多说了.

这边贴一下RequestQueue代码,里面有相关备注与理解
/**
* RequestQueue类作为volley的核心类,可以说是连接请求与响应的桥梁
*  一个拥有线程池的请求队列
* 调用add()分发,将添加一个用于分发的请求
* worker线程从缓存或网络获取响应,然后将该响应提供给主线程
*
* A request dispatch queue with a thread pool of dispatchers.
* Calling {@link #add(Request)} will enqueue the given Request for dispatch,
* resolving from either cache or network on a worker thread, and then delivering
* a parsed response on the main thread.
*/
public class RequestQueue {

/**
*  任务完成的回调接口
* Callback interface for completed requests.
*/
public static interface RequestFinishedListener<T> {
/** Called when a request has finished processing. */
public void onRequestFinished(Request<T> request);
}

/**
* 原子操作的Integer的类,线程安全的加减操作接口,记录队列中当前的请求数目
* Used for generating monotonically-increasing sequence numbers for requests.
*/
private AtomicInteger mSequenceGenerator = new AtomicInteger();

/**
* 等候缓存队列,重复请求集结map,每个queue里面都是相同的请求。
* 为什么需要这个map呢?map的key其实是request的url,如果我们有多个请求的url都是相同的,也就是说请求的资源是相同的,
* volley就把这些请求放入一个队列,在用url做key将队列放入map中。
* 因为这些请求都是相同的,可以说结果也是相同的。那么我们只要获得一个请求的结果,其他相同的请求,从缓存中取就可以了。
* 所以等候缓存队列的作用就是,当其中的一个request获得响应,我们就将这个队列放入缓存队列mCacheQueue中,让这些request去缓存获取结果就好了。
* 这是volley处理重复请求的思路。
* Staging area for requests that already have a duplicate request in flight.
* <ul>
*     <li>containsKey(cacheKey) indicates that there is a request in flight for the given cache
*          key.</li>
*     <li>get(cacheKey) returns waiting requests for the given cache key. The in flight request
*          is <em>not</em> contained in that list. Is null if no requests are staged.</li>
* </ul>
*/
private final Map<String, Queue<Request<?>>> mWaitingRequests =
new HashMap<String, Queue<Request<?>>>();

/**
* The set of all requests currently being processed by this RequestQueue. A Request
* will be in this set if it is waiting in any queue or currently being processed by
* any dispatcher.
* 队列当前拥有的所有请求的集合
* 请求在队列中或者正被调度中,都会在这个集合里
* 也就是下面mCacheQueue缓存队列与mNetworkQueue网络队列的总和
*/
private final Set<Request<?>> mCurrentRequests = new HashSet<Request<?>>();

/**
* 缓存队列
* The cache triage queue. */
private final PriorityBlockingQueue<Request<?>> mCacheQueue =
new PriorityBlockingQueue<Request<?>>();

/**
* 网络队列,PriorityBlockingQueue-阻塞优先级队列
* 线程安全,有阻塞功能,也就是说当队列里面没有东西的时候,线程试图从队列取请求,这个线程就会阻塞
* 根据Request实现compareTo接口可知:请求优先级高,则在队列中优先级高,
* 如果优先级相同,则根据mSequence序列号,来判断,先进先出
* The queue of requests that are actually going out to the network. */
private final PriorityBlockingQueue<Request<?>> mNetworkQueue =
new PriorityBlockingQueue<Request<?>>();

/**
* 默认用于调度的线程池数目
* Number of network request dispatcher threads to start. */
private static final int DEFAULT_NETWORK_THREAD_POOL_SIZE = 3;

/**
* 缓存接口,面向对象的思想,把缓存看成一个实体,此处只声明缓存实体的实现规则接口,
* 具体实现的时候,可以是用默认实现了Cache接口的DiskBasedCache类实体,也可以自定义扩展
* Cache interface for retrieving and storing responses. */
private final Cache mCache;

/**
* 网络接口,面向对象的思想,把网络看成一个实体,此处只声明网络实体的实现规则接口,
* 具体实现的时候,可以是默认实现了Network接口的BasicNetwork类实体,也可以自定义扩展
* Network interface for performing requests. */
private final Network mNetwork;

/**
* 响应分发器接口,负责把响应发给对应的请求,分发器存在的意义主要是为了耦合更加低并且能在主线程中操作UI
* 将网络响应的分发操作看成一个实体,此处声明实体的规则接口
* 具体实现时:可以是默认实现ResponseDelivery接口的ExecutorDelivery实体
* Response delivery mechanism. */
private final ResponseDelivery mDelivery;

/**
* 网络调度器数组,NetworkDispatcher继承了Thread类,其本质是多个线程,
* 所有线程都将被开启进入死循环,不断从mNetworkQueue网络队列取出请求,然后去网络Network请求数据
* The network dispatchers. */
private NetworkDispatcher[] mDispatchers;

/**
* 缓存调度器CacheDispatcher继承了Thread类,本质是一个线程,这个线程将会被开启进入一个死循环,
* 不断从mCacheQueue缓存队列取出请求,然后去缓存Cache中查找结果,如果没有请求则阻塞
* The cache dispatcher. */
private CacheDispatcher mCacheDispatcher;

/**
* 任务完成监听器队列
* 这个队列保留了很多监听器,这些监听器都是监听RequestQueue请求队列的,而不是监听单独的某个请求。
* RequestQueue中每个请求完成后,都会回调这个监听队列里面的所有监听器。
* 这是RequestQueue的统一管理的体现
*/
private List<RequestFinishedListener> mFinishedListeners =
new ArrayList<RequestFinishedListener>();

/**
* Creates the worker pool. Processing will not begin until {@link #start()} is called.
*
* @param cache A Cache to use for persisting responses to disk
* @param network A Network interface for performing HTTP requests
* @param threadPoolSize Number of network dispatcher threads to create
* @param delivery A ResponseDelivery interface for posting responses and errors
*/
public RequestQueue(Cache cache, Network network, int threadPoolSize,
ResponseDelivery delivery) {
mCache = cache;
mNetwork = network;
mDispatchers = new NetworkDispatcher[threadPoolSize];
mDelivery = delivery;
}

/**
* Creates the worker pool. Processing will not begin until {@link #start()} is called.
*
* @param cache A Cache to use for persisting responses to disk
* @param network A Network interface for performing HTTP requests
* @param threadPoolSize Number of network dispatcher threads to create
*/
public RequestQueue(Cache cache, Network network, int threadPoolSize) {
this(cache, network, threadPoolSize,
new ExecutorDelivery(new Handler(Looper.getMainLooper())));
}

/**
* Creates the worker pool. Processing will not begin until {@link #start()} is called.
*
* @param cache A Cache to use for persisting responses to disk
* @param network A Network interface for performing HTTP requests
*/
public
RequestQueue(Cache cache, Network network) {
this(cache, network, DEFAULT_NETWORK_THREAD_POOL_SIZE);
}

/**
* Starts the dispatchers in this queue.
*/
public void start() {
stop();  // Make sure any currently running dispatchers are stopped.
// Create the cache dispatcher and start it.
mCacheDispatcher = new CacheDispatcher(mCacheQueue, mNetworkQueue, mCache, mDelivery);
mCacheDispatcher.start();

// Create network dispatchers (and corresponding threads) up to the pool size.
for (int i = 0; i < mDispatchers.length; i++) {
NetworkDispatcher networkDispatcher = new NetworkDispatcher(mNetworkQueue, mNetwork,
mCache, mDelivery);
mDispatchers[i] = networkDispatcher;
networkDispatcher.start();
}
}

/**
* Stops the cache and network dispatchers.
*/
public void stop() {
if (mCacheDispatcher != null) {
mCacheDispatcher.quit();
}
for (int i = 0; i < mDispatchers.length; i++) {
if (mDispatchers[i] != null) {
mDispatchers[i].quit();
}
}
}

/**
* Gets a sequence number.
*/
public int getSequenceNumber() {
return mSequenceGenerator.incrementAndGet();
}

/**
* Gets the {@link Cache} instance being used.
*/
public Cache getCache() {
return mCache;
}

/**
* 一个简单的过滤接口,在cancelAll()方法里面被使用
* A simple predicate or filter interface for Requests, for use by
* {@link RequestQueue#cancelAll(com.android.volley.RequestQueue.RequestFilter)}.
*/
public interface RequestFilter {
public boolean apply(Request<?> request);
}

/**
*  根据过滤器规则,取消相应请求
* Cancels all requests in this queue for which the given filter applies.
* @param filter The filtering function to use
*/
public void cancelAll(RequestFilter filter) {
synchronized (mCurrentRequests) {
for (Request<?> request : mCurrentRequests) {
if (filter.apply(request)) {
request.cancel();
}
}
}
}

/**
* 根据标记取消相应请求
* Cancels all requests in this queue with the given tag. Tag must be non-null
* and equality is by identity.
*/
public void cancelAll(final Object tag) {
if (tag == null) {
throw new IllegalArgumentException("Cannot cancelAll with a null tag");
}
cancelAll(new RequestFilter() {
@Override
public boolean apply(Request<?> request) {
return request.getTag() == tag;
}
});
}

/**
* Adds a Request to the dispatch queue.
* @param request The request to service
* @return The passed-in request
*/
public <T> Request<T> add(Request<T> request) {
// Tag the request as belonging to this queue and add it to the set of current requests.
request.setRequestQueue(this);
synchronized (mCurrentRequests) {
mCurrentRequests.add(request);
}
//设置请求序号,自动+1获取
// Process requests in the order they are added.
request.setSequence(getSequenceNumber());
request.addMarker("add-to-queue");
//如果该请求不缓存,直接添加到网络队列退出,跳过会出队列的判断
// If the request is uncacheable, skip the cache queue and go straight to the network.
if (!request.shouldCache()) {
mNetworkQueue.add(request);
return request;
}

// Insert request into stage if there's already a request with the same cache key in flight.
synchronized (mWaitingRequests) {
String cacheKey = request.getCacheKey();
if (mWaitingRequests.containsKey(cacheKey)) {
// There is already a request in flight. Queue up.
//如果已经有一个请求在工作,则排队等候
Queue<Request<?>> stagedRequests = mWaitingRequests.get(cacheKey);
if (stagedRequests == null) {
stagedRequests = new LinkedList<Request<?>>();
}
stagedRequests.add(request);
mWaitingRequests.put(cacheKey, stagedRequests);
if (VolleyLog.DEBUG) {
VolleyLog.v("Request for cacheKey=%s is in flight, putting on hold.", cacheKey);
}
} else {
// Insert 'null' queue for this cacheKey, indicating there is now a request in
// flight.为该key插入null,表明现在有一个请求在工作
mWaitingRequests.put(cacheKey, null);
mCacheQueue.add(request);
}
return request;
}
}

/**
* Called from {@link Request#finish(String)}, indicating that processing of the given request
* has finished.
*
* <p>Releases waiting requests for <code>request.getCacheKey()</code> if
*      <code>request.shouldCache()</code>.</p>
*/
<T> void finish(Request<T> request) {
// Remove from the set of requests currently being processed.
synchronized (mCurrentRequests) {
mCurrentRequests.remove(request);
}
synchronized (mFinishedListeners) {
for (RequestFinishedListener<T> listener : mFinishedListeners) {
listener.onRequestFinished(request);
}
}

if (request.shouldCache()) {//如果该请求要被缓存
synchronized (mWaitingRequests) {
String cacheKey = request.getCacheKey();
Queue<Request<?>> waitingRequests = mWaitingRequests.remove(cacheKey);
//移除等候缓存队列中的相同请求
if (waitingRequests != null) {
if (VolleyLog.DEBUG) {
VolleyLog.v("Releasing %d waiting requests for cacheKey=%s.",
waitingRequests.size(), cacheKey);
}
// Process all queued up requests. They won't be considered as in flight, but
// that's not a problem as the cache has been primed by 'request'.
//这里需要注意,一个request完成以后,会将waitingRequests里面所有相同的请求,
// 都加入到mCacheQueue缓存队列中,这就意味着,这些请求从缓存中取出结果就好了,
// 这样就避免了频繁相同网络请求的开销。这也是Volley的亮点之一。
mCacheQueue.addAll(waitingRequests);
}
}
}
}

public  <T> void addRequestFinishedListener(RequestFinishedListener<T> listener) {
synchronized (mFinishedListeners) {
mFinishedListeners.add(listener);
}
}

/**
* Remove a RequestFinishedListener. Has no effect if listener was not previously added.
*/
public  <T> void removeRequestFinishedListener(RequestFinishedListener<T> listener) {
synchronized (mFinishedListeners) {
mFinishedListeners.remove(listener);
}
}
}


关键在此处,NetworkDispatcher网络调度器,在阻塞从队列里面获取请求,拿到请求request后,调用网络传输层的接口HttpStack去获取数据,关于HttpStack的扩展实现,我们就是根据支持不同网络传输层来实现.OkHttpStack就是基于我们的Okhttp来实现的,那么最后获取网络的操作,就交给okhttp库区做,我们需要做的就是volley的请求,转换为okhttp的网络请求,拿到返回数据,那么来贴一下OkHttpStack的实现代码解析下吧.
/**
* OkHttp backed {@link HttpStack HttpStack} that does not
* use okhttp-urlconnection
* OkHttp的执行器,可用于替换原框架自带的HttpUrlConnection执行器
* 参考: https://gist.github.com/bryanstern/4e8f1cb5a8e14c202750 * https://gist.github.com/ceram1/8254f7a68d81172c1669 */
public class OkHttpStack implements HttpStack {

private final OkHttpClient mClient;

/**
* Create a OkHttpStack with default OkHttpClient.
*/
public OkHttpStack() {
this.mClient=new OkHttpClient();
}

public OkHttpStack(OkHttpClient client) {
this.mClient = client;
}

@Override
public HttpResponse performRequest(Request<?> request, Map<String, String> additionalHeaders)
throws IOException, AuthFailureError {

OkHttpClient client = mClient.clone();
int timeoutMs = request.getTimeoutMs();
client.setConnectTimeout(timeoutMs, TimeUnit.MILLISECONDS);
client.setReadTimeout(timeoutMs, TimeUnit.MILLISECONDS);
client.setWriteTimeout(timeoutMs, TimeUnit.MILLISECONDS);
//根据volley的请求request,生成okhttp的builder构建器
//OkHttpUtils.get().url(url).build().execute(new CallBack());链式调用最后执行
//全部对应操作都在Builder中,所有操作完成后返回当前最新的builder
com.squareup.okhttp.Request.Builder okHttpRequestBuilder = new com.squareup.okhttp.Request.Builder();
okHttpRequestBuilder.url(request.getUrl());//设置url
KLog.i(request.getUrl().toString());
Map<String, String> headers = request.getHeaders();//添加request的header
for (final String name : headers.keySet()) {
okHttpRequestBuilder.addHeader(name, headers.get(name));
}
//添加额外自定义header
for (final String name : additionalHeaders.keySet()) {
okHttpRequestBuilder.addHeader(name, additionalHeaders.get(name));
}
//根据volley请求的request,设置对应的builder,请求类型
setConnectionParametersForRequest(okHttpRequestBuilder, request);
//执行后得到相应response
com.squareup.okhttp.Request okHttpRequest = okHttpRequestBuilder.build();
Call okHttpCall = client.newCall(okHttpRequest);//不能被执行2次,每次new
Response okHttpResponse = okHttpCall.execute();
//将得到的response,转换为BasicHttpResponse返回
return entityFromOkHttpResponse(okHttpResponse);
}

private static BasicHttpResponse entityFromOkHttpResponse(Response okHttpResponse) throws IOException {
final int responseCode = okHttpResponse.code();
//先判断相应码,为-1,IO异常,直接抛出异常
if (responseCode == -1) {
// -1 is returned by getResponseCode() if the response code could not be retrieved.
// Signal to the caller that something was wrong with the connection.
throw new IOException("Could not retrieve response code from HttpUrlConnection.");
}

StatusLine responseStatus = new BasicStatusLine(parseProtocol(okHttpResponse.protocol()), okHttpResponse.code(), okHttpResponse.message());
BasicHttpResponse response = new BasicHttpResponse(responseStatus);

//生成response相应的body实体
BasicHttpEntity entity = new BasicHttpEntity();
ResponseBody body = okHttpResponse.body();
entity.setContent(body.byteStream());
entity.setContentLength(body.contentLength());
entity.setContentEncoding(okHttpResponse.header("Content-Encoding"));//从头部获取ContentEncoding
if (body.contentType() != null) {
entity.setContentType(body.contentType().type());
}
//设置ENTITY
response.setEntity(entity);
//遍历响应消息头部,将信息加入BasicHttpResponse里面
Headers responseHeaders = okHttpResponse.headers();
for (int i = 0, len = responseHeaders.size(); i < len; i++) {
final String name = responseHeaders.name(i), value = responseHeaders.value(i);
if (name != null) {
response.addHeader(new BasicHeader(name, value));
}
}
KLog.i(response.getStatusLine());
return response;
}

@SuppressWarnings("deprecation")
private static void setConnectionParametersForRequest(com.squareup.okhttp.Request.Builder builder, Request<?> request)
throws IOException, AuthFailureError {
switch (request.getMethod()) {
case Request.Method.DEPRECATED_GET_OR_POST:
// Ensure backwards compatibility.  Volley assumes a request with a null body is a GET.
byte[] postBody = request.getPostBody();
if (postBody != null) {
builder.post(RequestBody.create(MediaType.parse(request.getPostBodyContentType()), postBody));
}
break;
case Request.Method.GET:
builder.get();
break;
case Request.Method.DELETE:
builder.delete();
break;
case Request.Method.POST:
builder.post(createRequestBody(request));
break;
case Request.Method.PUT:
builder.put(createRequestBody(request));
break;
case Request.Method.HEAD:
builder.head();
break;
case Request.Method.OPTIONS:
builder.method("OPTIONS", null);
break;
case Request.Method.TRACE:
builder.method("TRACE", null);
break;
case Request.Method.PATCH:
builder.patch(createRequestBody(request));
break;
default:
throw new IllegalStateException("Unknown method type.");
}
}

private static ProtocolVersion parseProtocol(final Protocol p) {
switch (p) {
case HTTP_1_0:
return new ProtocolVersion("HTTP", 1, 0);
case HTTP_1_1:
return new ProtocolVersion("HTTP", 1, 1);
case SPDY_3:
return new ProtocolVersion("SPDY", 3, 1);
case HTTP_2:
return new ProtocolVersion("HTTP", 2, 0);
}

throw new IllegalAccessError("Unkwown protocol");
}

private static RequestBody createRequestBody(Request r) throws AuthFailureError {
final byte[] body = r.getBody();
if (body == null) return null;

return RequestBody.create(MediaType.parse(r.getBodyContentType()), body);
}
}
里面有一些是我当时加的备注,在来简单解析下,首先 HttpStack,作为网络传输层统一调用接口,就是在进行网络操作的时候调用HttpStack.performRequest(Request)
入参我们发起的请求Request,最终处理返回Response,关键就是如何实现这个方法:
根据volley的请求request,生成okhttp的builder构建器,发送请求,处理返回结果后抛出.
我们来了解下okhttp的简单使用
Request request = new Request.Builder()
.url("https://github.com/gongjr")
.header("User-Agent", "OkHttp Headers.java")
.addHeader("Accept", "application/json; q=0.5")
.addHeader("Accept", "application/vnd.github.v3+json")
.build();
Response response = client.newCall(request).execute();
细心的朋友可能注意到上面使用了
Call okHttpCall = client.newCall(okHttpRequest);
每次new的方式,避免其被再次调用,因为retry机制在volley中,在上一层策略处理.下面简单的从okhttp源码来认识下okhttp库的构成

4.Okhttp源码分析



此Okhttp总体设计图,来源其他博主,对深入了解的朋友可以看看源码解析
简单说,okhttp库主要是通过Diapatcher不断从RequestQueue中取出请求(Call),根据是否已缓存调用Cache或 Network这两类数据获取接口之一,从内存缓存或是服务器,得请求的数据。该引擎有同步和异步请求,同步请求通过Call.execute()直接返 回当前的Response.
前面部分在任何网络库中都会设计到,基本和volley请求策略设计是差不多的,但是关键一点是callback回来是在线程里面, 不能刷新UI,线程调度管理,在发起与回调并没有一体化处理,单独使用Okhttp,还是需要额外线程调度处理,所以我们还是统一使用volley管理,当然也有习惯volley的原因O(∩_∩)O~.




从OkHttpClient类的整体设计来看,它采用门面模式。client知晓子模块的所有配置以及提供需要的参数。client会将所有从客户端发来的请求委派到对应的子系统去处理。在该系统中,有多个子系统、类或者类的集合。例如上面的cache、连接以及连接池相关类的集合、网络配置相关类集合等。每个子系统都可以被客户端直接调 用,或者被门面角色调用。子系统并不知道门面的存在,对于子系统而言,门面仅仅是另外一个客户端而已。同时,OkHttpClient可以看作是整个框架
的上下文。

通过类图,其实很明显反应了该框架的几大核心子系统;路由、连接协议、拦截器、代理、安全性认证、连接池以及网络适配。从client大大降低了开发者使用难度,同时非常明了的展示了该框架在所有需要的配置以及获取结果的方式.里面关于SPDY的方式,允许连接同一主机的所有请求分享一个socket实现,以及通过拦截器对Request 的body进行gzip的压缩,来减少流量的传输等等.

楼主对Okhttp只是简单了解,还需要不断学习研究,就先简单介绍下.

OKHttp源码位置: https://github.com/square/okhttp

5.Rxjava介绍,及处理异步调度

首先来说下,为何引入了Rxjava来进行异步调度?

RxJava 是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库.它就是一个实现异步操作的库,同样是做异步,为什么人们用它,而不用现成的
AsyncTask / Handler / XXX / ... ?

关键在与,它的简洁的与众不同之处在于,随着程序逻辑变得越来越复杂,它依然能够保持简洁
RxJava
的异步实现,是通过一种扩展的观察者模式来实现的
RxJava
有四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer,无订阅则不发出通知。

简单用法就是,观察者订阅被观察者,被观察者变化通知观察者,在这个机制里,方便地加入异步调度,切片处理事件响应.
操作步骤:
第一步,在发起网络请求,加入到QequestQueue时,生成一个Observable被观察对象,
将其抛出,在外部需要处理的地方订阅,等待响应网络返回变化
/**
* 执行请求任务,并返回一个RxJava的Observable类型
*/
public static Observable<Result> getResult(final Request<?> request, Object tag) {
addRequest(request,tag);
return RxBus.getDefault().take(Result.class)
.filter(new Func1<Result, Boolean>() {
@Override
public Boolean call(Result result) {
return request.getUrl().equals(result.url);
}
})
.take(1);
}
第二步,在需要发起网络请求的地方,发起请求,获得被观察者,订阅,定义时间响应
/**
* 获取数据,更新页面list列表
* 同时异步进行数据库缓存,页面提示缓存结果信息
*/
public void getGoodsInfoWithAsyncResult(){
Observable<Result> observable=HttpController.getInstance().getQueryQcWithAsyncResult(getUpdateTime(QcKey), new Response.Listener<JSONObject>() {
@Override
public void onResponse(JSONObject data) {
try {
if (data.getString("msg").equals("ok")) {
Gson gson = new Gson();
JSONObject datainfo = data.getJSONObject("data");
String info = datainfo.getString("info");
List<GoodsInfo> lHttpGoodsInfos = gson.fromJson(info, new TypeToken<List<GoodsInfo>>() {
}.getType());
mGoodsInfoAdapter.refreshDate(lHttpGoodsInfos);//刷新页面list列表
} else {
showShortTip("商品信息获取失败! " + data.getString("msg"));
dismissLoadingDF();
}
} catch (JSONException e) {
showShortTip("商品信息数据解析失败! ");
dismissLoadingDF();
e.printStackTrace();
}
}
}, new Response.ErrorListener() {
@Override
public void onErrorResponse(VolleyError error) {
dismissLoadingDF();
showShortTip(VolleyErrorHelper.getMessage(error, mActivity));
}
});

/**
* 后台缓存数据处理耗时操作
*/
Func1 saveCacheBackThread=new Func1<Result, ResultMap>() {
@Override
public ResultMap call(Result result) {
JSONObject data=null;
ResultMap lMap=new ResultMap();
try {
String jsonString = new String(result.data,
HttpHeaderParser.parseCharset(result.header, HTTP.UTF_8));
data=new JSONObject(jsonString);
lMap.setMsg(data.getString("msg"));
lMap.setErrcode(data.getString("errcode"));
if (data.getString("msg").equals("ok")) {
Gson gson = new Gson();
JSONObject datainfo = data.getJSONObject("data");
String info = datainfo.getString("info");
List<GoodsInfo> lHttpGoodsInfos = gson.fromJson(info, new TypeToken<List<GoodsInfo>>() {
}.getType());
if (lHttpGoodsInfos != null && lHttpGoodsInfos.size() > 0) {
DataSupport.saveAll(lHttpGoodsInfos);
//数据库缓存数据,以及相关耗时操作
}else KLog.i("无数据");
}else {
KLog.i("errcode:"+data.getString("errcode")+" msg:"+data.getString("msg"));
}
}catch (Exception e){
e.printStackTrace();
}
return lMap;
}
};

/**
* 主线程响应处理事件
*/
Action1 updateMain=new Action1<ResultMap>() {
@Override
public void call(ResultMap data) {
if (data.getErrcode().equals("0")) {
showShortTip("商品信息缓存成功!");
} else {
showShortTip("缓存失败!" + data.getMsg());
}
}
};

/**
* 观察者对象,订阅被观察者,在事件响应时进行相关处理
*/
Subscription subscription = observable
.filter(new Func1<Result, Boolean>() {
@Override
public Boolean call(Result result) {
return result.data != null;
}
})
.map(saveCacheBackThread)//订阅事件处理注册
.subscribeOn(Schedulers.io())// 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread())// 指定 Subscriber 的回调发生在主线程
.subscribe(updateMain);//回调事件注册
}


上面是一个需求解决的代码,首先这个过程之前的处理实现没有介入Rxjava,数据库的操作很重,一次读写操作花费 10~20ms 是很常见的,这样的耗时很容易造成界面的卡顿。所以通常情况下,如果可以的话一定要避免在主线程中处理数据库。为了防止数据量多的时候,数据库访问,导致主线程负载过多,缓存放在子线程来异步进行,处理完成,更新主线程消息,用AsyncTask / Handler / EventBus /处理这个过程都比较繁琐,所以改了Rxjava,处理逻辑比较简洁了. 

这边是直接用了Rxjava,实现的Rxbus,同样实现一个观察者模式的消息机制,在原来的NetworkDispatcher网络调度器中,加入了异步响应的执行

//NetworkDispatcher网络调度器中执行网络操作,拿到返回
NetworkResponse networkResponse = mNetwork.performRequest(request);
//基于Rxjava的订阅机制,执行异步响应分发
if (networkResponse.data != null) {
RxBus.getDefault().post(new Result(request.getUrl(),
networkResponse.headers, networkResponse.data));
}
//正常的响应分发
ResponseDelivery.postResponse(request, response);
这样就加入一个异步响应机制.用不到的话就无视,有特殊需求可以调用处理. 
对于Rxjava想要了解更多的,推荐一个中文文档地址:https://github.com/mcxiaoke/RxDocs
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  Volley Okhttp 源码 Rxjava