我所理解的响应式编程
2015-06-17 18:08
363 查看
我所理解的响应式编程
函数响应式编程(FRP Functional Reactive
Programming),为解决现代编程问题提供了全新的视角.一旦理解它,可以极大地简化你的项目,特别是处理嵌套回调的异步事件,复杂的列表过滤和变换,或者时间相关问题。
我们现在大多使用的是命令式编程,命令式编程与函数相应式编程的区别如下:
命令式编程:以命令为主,给机器提供一条又一条的命令序列让其原封不动的执行。
函数响应式编程(FRP):使用异步数据流进行编程。FRP的思想比较难理解,需要我们将以往的命令式编程思想转变为响应式编程思想。我们要做的就是面向数据流编程。Everything is a stream
就像我们熟知的面向对象思想一样,把事物都看作Stream。变量、用户输入、属性、Cache、数据结构等等。这个开始可能会很难。
在命令式编程环境中,a:=b+c表示将表达式的结果赋给a,而之后改变b或c的值不会影响a。但在响应式编程中,a的值会随着b或c的更新而更新。e.g 电子表格程序就是响应式编程的一个例子。单元格可以包含字面值或类似"=B1+C1"的公式,而包含公式的单元格的值会依据其他单元格的值的变化而变化。这很类似观察者模式(Gof)。函数响应式编程的重点是流,Stream能接受一个,甚至多个Stream为输入。你可以merge两个Stream,也可以从一个Stream中filter出你感兴趣的Events以生成一个新的Stream,还可以把一个Stream中的Data
values map到一个新的Stream中。
以上网上都有概括,我就不都说了,那么直接切入主题吧。就拿我项目中遇到的问题来说吧。我们这客户端的需求是这样的,当用户填写完地址后,点击提交按钮,将对地址进行反地理编码(就是将用户输入的地址转换成经纬度坐标)。然后调用上传接口将经纬度上传到后台服务器。
用户输入完地址点击提交按钮---调用反地理编码功能获取经纬度(asyn)---将经纬度提交后台服务器(asyn)
bean
代码简单易懂,commit方法实现了获取地理位置和提交功能的组合,这种组合方法简单易懂。只要在最外层捕捉异常就能做统一处理。
但是这种阻塞式风格明显不符合需求,请求地理位置、提交这些都应该是异步的,所以我们必须优化代码。我们最常干的事就是写回调。
以上回调分为两种
1.onLocationReceived,onSubmitReceived,onCommitReceived
2.onError
可以将这些回调抽取出来作为公共回调
注意前方高能
分析一下我们的回调形式,发现有一个共同特点有木有,(getLocation,submitLocation,commit)这些函数参数形式是一个参数一个回调对象。我们的优化是要在这些异步操作中返回一些临时对象。我们需要定义一个公共的对象来为异步操作作为返回对象。just do IT.
现在感觉逻辑结构清晰了点。
我们再来试试将代码分解成更小
高能要来了
在这个方法里,回调的层级还是太多了,下面所要做的就是在这里做优化。
在以上代码中,回调基本都是一致的onResult和onError,我们要将回调和功能代码进行分离。
在java中,我们不能将方法作为参数传递,所以我们需要通过类(和接口)来简介实现这样的功能。
接下来,我们来改造一下AsynJob
上面一步步的优化过程其实是RxJava功能的冰山一角。
AsynJob<T>实际上就是Observable,它不止可以只分发一个单一的结果也可以是一个序列(可以为空)。
CallBack<T>就是Observer,除了CallBack少了一个onNext(T t)方法。Observer中在onError(Throwable t)方法被调用后,会继而调用onCompleted(),然后Observer会包装好病发送出事件流(因为它能发送一个序列)。
abstract void start(Callback<T> callback)对应Subscription subscribe(final Observer<? super T> observer),这个方法也返回Subscription,不需要它时你可以决定取消接收事件流。
来看看我使用了RxAndroid后,代码的变化
是不是逻辑清晰多了,代码简洁了很多。
源码地址
除了RxJava,RxAndroid,还有
详情请点这里
写得不好,欢迎吐槽。博客参考自
函数响应式编程(FRP Functional Reactive
Programming),为解决现代编程问题提供了全新的视角.一旦理解它,可以极大地简化你的项目,特别是处理嵌套回调的异步事件,复杂的列表过滤和变换,或者时间相关问题。
我们现在大多使用的是命令式编程,命令式编程与函数相应式编程的区别如下:
命令式编程:以命令为主,给机器提供一条又一条的命令序列让其原封不动的执行。
函数响应式编程(FRP):使用异步数据流进行编程。FRP的思想比较难理解,需要我们将以往的命令式编程思想转变为响应式编程思想。我们要做的就是面向数据流编程。Everything is a stream
就像我们熟知的面向对象思想一样,把事物都看作Stream。变量、用户输入、属性、Cache、数据结构等等。这个开始可能会很难。
在命令式编程环境中,a:=b+c表示将表达式的结果赋给a,而之后改变b或c的值不会影响a。但在响应式编程中,a的值会随着b或c的更新而更新。e.g 电子表格程序就是响应式编程的一个例子。单元格可以包含字面值或类似"=B1+C1"的公式,而包含公式的单元格的值会依据其他单元格的值的变化而变化。这很类似观察者模式(Gof)。函数响应式编程的重点是流,Stream能接受一个,甚至多个Stream为输入。你可以merge两个Stream,也可以从一个Stream中filter出你感兴趣的Events以生成一个新的Stream,还可以把一个Stream中的Data
values map到一个新的Stream中。
以上网上都有概括,我就不都说了,那么直接切入主题吧。就拿我项目中遇到的问题来说吧。我们这客户端的需求是这样的,当用户填写完地址后,点击提交按钮,将对地址进行反地理编码(就是将用户输入的地址转换成经纬度坐标)。然后调用上传接口将经纬度上传到后台服务器。
用户输入完地址点击提交按钮---调用反地理编码功能获取经纬度(asyn)---将经纬度提交后台服务器(asyn)
bean
package com.liang.frpdemo; /** * 地理位置bean * @author tomliang * */ public class LocationBean { public int lon; public int lat; }操作接口
package com.liang.frpdemo; /** * 操作接口 * @author tomliang */ public interface Api { /** * 将输入的地址反地理编码成经纬度 * @param address * @return */ LocationBean getLocation(String address); /** * 将经纬度提交给服务器 * @param bean */ void submitLocation(LocationBean bean); }业务逻辑helper类
package com.liang.frpdemo; public class LocationHelper { private Api api; private static LocationHelper helper = new LocationHelper(); private LocationHelper() { } public static LocationHelper getHelper(){ return helper; } void commit(String address){ try { LocationBean location = api.getLocation(address); api.submitLocation(location); } catch (Exception e) { e.printStackTrace(); } } }
代码简单易懂,commit方法实现了获取地理位置和提交功能的组合,这种组合方法简单易懂。只要在最外层捕捉异常就能做统一处理。
但是这种阻塞式风格明显不符合需求,请求地理位置、提交这些都应该是异步的,所以我们必须优化代码。我们最常干的事就是写回调。
package com.liang.frpdemo; /** * 操作接口 * @author tomliang */ public interface Api { /** * 将输入的地址反地理编码成经纬度 * @param address * @param getCallBack */ void getLocation(String address, LocationCallBack getCallBack); /** * 将经纬度提交给服务器 * @param submitCallBack */ void submitLocation(LocationBean bean, SubmitCallBack submitCallBack); /** * 获取地理位置回调 * @author tomliang * */ interface LocationCallBack{ void onLocationReceived(LocationBean bean); void onError(); } /** * 提交位置回调 * @author tomliang * */ interface SubmitCallBack{ void onSubmitReceived(); void onError(); } }然后我们的helper类变成了
package com.liang.frpdemo; import com.liang.frpdemo.Api.LocationCallBack; import com.liang.frpdemo.Api.SubmitCallBack; public class LocationHelper { private Api api; private static LocationHelper helper = new LocationHelper(); private LocationHelper() { api = new DefaultApi(); } public static LocationHelper getHelper(){ return helper; } void commit(String address, final CommitCallBack callback){ api.getLocation(address, new LocationCallBack() { @Override public void onLocationReceived(LocationBean bean) { api.submitLocation(bean, new SubmitCallBack() { @Override public void onSubmitReceived() { callback.onCommitReceived(); } @Override public void onError() { callback.onError(); } }); } @Override public void onError() { callback.onError(); } }); } public interface CommitCallBack{ void onCommitReceived(); void onError(); } }逻辑没变,但是看完这些回调是不是感觉不再爱了。组合已经没有了,错误需要我们手动一级级向外传递。接着优化
以上回调分为两种
1.onLocationReceived,onSubmitReceived,onCommitReceived
2.onError
可以将这些回调抽取出来作为公共回调
package com.liang.frpdemo; public interface CallBack<T> { void onResult(T result); void onError(); }新建一个ApiWrapper转换一下调用
package com.liang.frpdemo; import com.liang.frpdemo.Api.LocationCallBack; import com.liang.frpdemo.Api.SubmitCallBack; public class ApiWrapper { Api api; public ApiWrapper(){ api = new DefaultApi(); } public void getLocation(String address, final CallBack<LocationBean> callback){ api.getLocation(address, new LocationCallBack() { @Override public void onLocationReceived(LocationBean bean) { callback.onResult(bean); } @Override public void onError() { callback.onError(); } }); } public void submitLocation(LocationBean bean, final CallBack<Void> callback){ api.submitLocation(bean, new SubmitCallBack() { @Override public void onSubmitReceived() { callback.onResult(null); } @Override public void onError() { callback.onError(); } }); } }修改helper
package com.liang.frpdemo; public class LocationHelper { private ApiWrapper api; private static LocationHelper helper = new LocationHelper(); private LocationHelper() { api = new ApiWrapper(); } public static LocationHelper getHelper(){ return helper; } void commit(String address, final CallBack<Void> callback){ api.getLocation(address, new CallBack<LocationBean>() { @Override public void onResult(LocationBean result) { api.submitLocation(result, callback); } @Override public void onError() { callback.onError(); } }); }代码上相比之前是减少了不少,因为我们通过ApiWrapper来减少了回调间的层级调用。
注意前方高能
分析一下我们的回调形式,发现有一个共同特点有木有,(getLocation,submitLocation,commit)这些函数参数形式是一个参数一个回调对象。我们的优化是要在这些异步操作中返回一些临时对象。我们需要定义一个公共的对象来为异步操作作为返回对象。just do IT.
package com.liang.frpdemo; public abstract class AsynJob<T> { public abstract void start(CallBack<T> callback); }将异步的方法返回值变为该对象
package com.liang.frpdemo; import com.liang.frpdemo.Api.LocationCallBack; import com.liang.frpdemo.Api.SubmitCallBack; public class ApiWrapper { Api api; public ApiWrapper(){ api = new DefaultApi(); } public AsynJob<LocationBean> getLocation(final String address){ return new AsynJob<LocationBean>() { @Override public void start(final CallBack<LocationBean> callback) { api.getLocation(address, new LocationCallBack() { @Override public void onLocationReceived(LocationBean bean) { callback.onResult(bean); } @Override public void onError() { callback.onError(); } }); } }; } public AsynJob<Void> submitLocation(final LocationBean bean){ return new AsynJob<Void>() { @Override public void start(final CallBack<Void> callback) { api.submitLocation(bean, new SubmitCallBack() { @Override public void onSubmitReceived() { callback.onResult(null); } @Override public void onError() { callback.onError(); } }); } }; } }同样更改helper
package com.liang.frpdemo; public class LocationHelper { private ApiWrapper api; private static LocationHelper helper = new LocationHelper(); private LocationHelper() { api = new ApiWrapper(); } public static LocationHelper getHelper(){ return helper; } AsynJob<Void> commit(final String address){ return new AsynJob<Void>() { @Override public void start(final CallBack<Void> callback) { api.getLocation(address).start(new CallBack<LocationBean>() { @Override public void onResult(LocationBean result) { api.submitLocation(result).start(new CallBack<Void>() { @Override public void onResult(Void result) { callback.onResult(null); } @Override public void onError() { callback.onError(); } }); } @Override public void onError() { callback.onError(); } }); } }; } }
现在感觉逻辑结构清晰了点。
我们再来试试将代码分解成更小
package com.liang.frpdemo; public class LocationHelper { private ApiWrapper api; private static LocationHelper helper = new LocationHelper(); private LocationHelper() { api = new ApiWrapper(); } public static LocationHelper getHelper(){ return helper; } AsynJob<Void> commit(final String address){ final AsynJob<LocationBean> locationJob = api.getLocation(address); AsynJob<Void> submitJob = new AsynJob<Void>() { @Override public void start(final CallBack<Void> callback) { locationJob.start(new CallBack<LocationBean>() { @Override public void onResult(LocationBean result) { api.submitLocation(result).start(new CallBack<Void>(){ @Override public void onResult(Void result) { callback.onResult(result); } @Override public void onError() { callback.onError(); } } ); } @Override public void onError() { callback.onError(); } }); } }; return submitJob; } }好像清晰得不够明显啊。
高能要来了
AsynJob<Void> submitJob = new AsynJob<Void>() { @Override public void start(final CallBack<Void> callback) { locationJob.start(new CallBack<LocationBean>() { @Override public void onResult(LocationBean result) { api.submitLocation(result).start(new CallBack<Void>(){ @Override public void onResult(Void result) { callback.onResult(result); } @Override public void onError() { callback.onError(); } } ); } @Override public void onError() { callback.onError(); } }); } };
在这个方法里,回调的层级还是太多了,下面所要做的就是在这里做优化。
在以上代码中,回调基本都是一致的onResult和onError,我们要将回调和功能代码进行分离。
在java中,我们不能将方法作为参数传递,所以我们需要通过类(和接口)来简介实现这样的功能。
package com.liang.frpdemo; public interface Func<T, R> { R call(T t); }T对应于参数类型,R对应于返回类型。
接下来,我们来改造一下AsynJob
package com.liang.frpdemo; public abstract class AsynJob<T> { public abstract void start(CallBack<T> callback); public <R> AsynJob<R> map(final Func<T, AsynJob<R>> func){ final AsynJob<T> source = this; return new AsynJob<R>() { @Override public void start(final CallBack<R> callback) { source.start(new CallBack<T>() { @Override public void onResult(T result) { AsynJob<R> mapped = func.call(result); mapped.start(new CallBack<R>() { @Override public void onResult(R result) { callback.onResult(result); } @Override public void onError() { callback.onError(); } }); } @Override public void onError() { callback.onError(); } }); } }; } }再来看看helper
package com.liang.frpdemo; public class LocationHelper { private ApiWrapper api; private static LocationHelper helper = new LocationHelper(); private LocationHelper() { api = new ApiWrapper(); } public static LocationHelper getHelper(){ return helper; } AsynJob<Void> commit(final String address){ final AsynJob<LocationBean> locationJob = api.getLocation(address); AsynJob<Void> submitJob = locationJob.map(new Func<LocationBean, AsynJob<Void>>() { @Override public AsynJob<Void> call(LocationBean t) { return api.submitLocation(t); } }); return submitJob; } }是不是感觉很帅,这风骚的代码风格。
上面一步步的优化过程其实是RxJava功能的冰山一角。
AsynJob<T>实际上就是Observable,它不止可以只分发一个单一的结果也可以是一个序列(可以为空)。
CallBack<T>就是Observer,除了CallBack少了一个onNext(T t)方法。Observer中在onError(Throwable t)方法被调用后,会继而调用onCompleted(),然后Observer会包装好病发送出事件流(因为它能发送一个序列)。
abstract void start(Callback<T> callback)对应Subscription subscribe(final Observer<? super T> observer),这个方法也返回Subscription,不需要它时你可以决定取消接收事件流。
来看看我使用了RxAndroid后,代码的变化
package com.liang.frpdemo; import rx.Observable; import rx.Subscriber; /** * Created by Administrator on 2015/6/16. */ public class NewApiWrapper { Api api; public NewApiWrapper(){ api = new DefaultApi(); } public Observable<LocationBean> getLocation(final String address){ return Observable.create(new Observable.OnSubscribe<LocationBean>() { @Override public void call(final Subscriber<? super LocationBean> subscriber) { api.getLocation(address, new Api.LocationCallBack() { @Override public void onLocationReceived(LocationBean bean) { subscriber.onNext(bean); } @Override public void onError(Throwable e) { subscriber.onError(e); } }); } }); } public Observable<Void> submitLocation(final LocationBean bean){ return Observable.create(new Observable.OnSubscribe<Void>() { @Override public void call(final Subscriber<? super Void> subscriber) { api.submitLocation(bean, new Api.SubmitCallBack() { @Override public void onSubmitReceived() { subscriber.onNext(null); } @Override public void onError(Throwable e) { subscriber.onError(e); } }); } }); } }helper的变化
package com.liang.frpdemo; import rx.Observable; import rx.Subscriber; import rx.functions.Func1; public class NewLocationHelper { private NewApiWrapper api; private static NewLocationHelper helper = new NewLocationHelper(); private NewLocationHelper() { api = new NewApiWrapper(); } public static NewLocationHelper getHelper(){ return helper; } Observable<Void> commit(final String address){ Observable<LocationBean> locationJob = api.getLocation(address); Observable<Void> submitJob = locationJob.flatMap(new Func1<LocationBean, Observable<Void>>() { @Override public Observable<Void> call(LocationBean locationBean) { return api.submitLocation(locationBean); } }); return submitJob; } }
是不是逻辑清晰多了,代码简洁了很多。
源码地址
除了RxJava,RxAndroid,还有
详情请点这里
写得不好,欢迎吐槽。博客参考自
NotRxJava懒人专用指南
相关文章推荐
- 蚁群算法和简要matlab来源
- Java图形化界面设计——容器(JFrame)
- 八皇后问题python实现
- 贪吃蛇(修改Ⅰ版)
- C# 读取文件的修改时间、访问时间、创建时间
- Struts2(一)接收参数
- 简单说一下java中计时器,实际需要,没有详解
- Qt获取控件位置,坐标总结
- php dump mysql 数据库,生成数据库脚本
- JSP页面遍历对象集合,map,数组等,Struts的s标签和c标签的区别
- 【每日算法】C语言8大经典排序算法(2)
- 用Python写单向链表和双向链表
- php中header函数参数的Cache-control:private,no-cache,must-revalidate,max-age的使用方法
- 机器视觉开源代码集合
- 如何让打包的C++安装程序以管理员身份在Win7下运行
- 呵呵!手把手带你在 IIS 上运行 Python(转)
- 1.Lua 环境搭建
- C#之WCF入门1—简单的wcf例子
- JAVA中去掉空格
- Yii2框架之Cookie加密篇