spring读写分离 - 事务配置篇(转)
2015-10-12 21:53
1146 查看
转自:http://jinnianshilongnian.iteye.com/blog/1720618
http://neoremind.net/2011/06/spring实现数据库读写分离
目前的一些解决方案需要在程序中手动指定数据源,比较麻烦,后边我会通过AOP思想来解决这个问题。
2、中间件
mysql-proxy:http://hi.baidu.com/geshuai2008/item/0ded5389c685645f850fab07
Amoeba for MySQL:http://www.iteye.com/topic/188598和http://www.iteye.com/topic/1113437
此处我们介绍一种在应用层的解决方案,通过spring动态数据源和AOP来解决数据库的读写分离。
该方案目前已经在一个互联网项目中使用了,而且可以很好的工作。
建议数据访问层使用jdbc、ibatis,不建议hibernate。
在应用层支持『当写时默认读操作到写库』,这样如果我们采用这种方案,在写操作后读数据直接从写库拿,不会产生数据复制的延迟问题;
应用层解决读写分离,理论支持任意数据库。
2、必须按照配置约定进行配置,不够灵活。
方案1:当只有读操作的时候,直接操作读库(从库);
当在写事务(即写主库)中读时,也是读主库(即参与到主库操作),这样的优势是可以防止写完后可能读不到刚才写的数据;
此方案其实是使用事务传播行为为:SUPPORTS解决的。
![](https://oscdn.geek-share.com/Uploads/Images/Content/201910/14/65cc574bac4fb5a82307b0b8c268c9b1.jpg)
方案2:当只有读操作的时候,直接操作读库(从库);
当在写事务(即写主库)中读时,强制走从库,即先暂停写事务,开启读(读从库),然后恢复写事务。
此方案其实是使用事务传播行为为:NOT_SUPPORTS解决的。
cn.javass.common.datasource.ReadWriteDataSourceDecision:读写库选择的决策者,具体参考javadoc;
cn.javass.common.datasource.ReadWriteDataSourceProcessor:此类实现了两个职责(为了减少类的数量将两个功能合并到一起了):读/写动态数据库选择处理器、通过AOP切面实现读/写选择,具体参考javadoc。
1.1、写库配置
1.2、读库配置
1.3、读写动态库配置
通过writeDataSource指定写库,通过readDataSourceMap指定从库列表,从库列表默认通过顺序轮询来使用读库,具体参考javadoc;
2、XML事务属性配置
所以读方法必须是read-only(必须,以此来判断是否是读方法)。
3、事务管理器
事务管理器管理的是readWriteDataSource
4、读/写动态数据库选择处理器
根据之前的txAdvice配置的事务属性决定是读/写,具体参考javadoc;
forceChoiceReadWhenWrite:用于确定在如果目前是写(即开启了事务),下一步如果是读,是直接参与到写库进行读,还是强制从读库读,具体参考javadoc;
5、事务切面和读/写库选择切面
1、事务切面一般横切业务逻辑层;
2、此处我们使用readWriteDataSourceTransactionProcessor的通过AOP切面实现读/写库选择功能,order=Integer.MIN_VALUE(即最高的优先级),从而保证在操作事务之前已经决定了使用读/写库。
6、测试用例
只要配置好事务属性(通过read-only=true指定读方法)即可,其他选择读/写库的操作都交给readWriteDataSourceTransactionProcessor完成。
可以参考附件的:
cn.javass.readwrite.ReadWriteDBTestWithForceChoiceReadOnWriteFalse
cn.javass.readwrite.ReadWriteDBTestWithNoForceChoiceReadOnWriteTrue
如何配置mysql数据库的主从?
单机配置mysql主从:http://my.oschina.net/god/blog/496常见的解决数据库读写分离有两种方案
1、应用层http://neoremind.net/2011/06/spring实现数据库读写分离
目前的一些解决方案需要在程序中手动指定数据源,比较麻烦,后边我会通过AOP思想来解决这个问题。
2、中间件
mysql-proxy:http://hi.baidu.com/geshuai2008/item/0ded5389c685645f850fab07
Amoeba for MySQL:http://www.iteye.com/topic/188598和http://www.iteye.com/topic/1113437
此处我们介绍一种在应用层的解决方案,通过spring动态数据源和AOP来解决数据库的读写分离。
该方案目前已经在一个互联网项目中使用了,而且可以很好的工作。
该方案目前支持
一读多写;当写时默认读操作到写库、当写时强制读操作到读库。考虑未来支持
读库负载均衡、读库故障转移等。使用场景
不想引入中间件,想在应用层解决读写分离,可以考虑这个方案;建议数据访问层使用jdbc、ibatis,不建议hibernate。
优势
应用层解决,不引入额外中间件;在应用层支持『当写时默认读操作到写库』,这样如果我们采用这种方案,在写操作后读数据直接从写库拿,不会产生数据复制的延迟问题;
应用层解决读写分离,理论支持任意数据库。
缺点
1、不支持@Transactional注解事务,此方案要求所有读方法必须是read-only=true,因此如果是@Transactional,这样就要求在每一个读方法头上加@Transactional 且readOnly属性=true,相当麻烦。 :oops:2、必须按照配置约定进行配置,不够灵活。
两种方案
当在写事务(即写主库)中读时,也是读主库(即参与到主库操作),这样的优势是可以防止写完后可能读不到刚才写的数据;
此方案其实是使用事务传播行为为:SUPPORTS解决的。
![](https://oscdn.geek-share.com/Uploads/Images/Content/201910/14/65cc574bac4fb5a82307b0b8c268c9b1.jpg)
方案2:当只有读操作的时候,直接操作读库(从库);
当在写事务(即写主库)中读时,强制走从库,即先暂停写事务,开启读(读从库),然后恢复写事务。
此方案其实是使用事务传播行为为:NOT_SUPPORTS解决的。
核心组件
cn.javass.common.datasource.ReadWriteDataSource:读写分离的动态数据源,类似于AbstractRoutingDataSource,具体参考javadoc;cn.javass.common.datasource.ReadWriteDataSourceDecision:读写库选择的决策者,具体参考javadoc;
cn.javass.common.datasource.ReadWriteDataSourceProcessor:此类实现了两个职责(为了减少类的数量将两个功能合并到一起了):读/写动态数据库选择处理器、通过AOP切面实现读/写选择,具体参考javadoc。
具体配置
1、数据源配置1.1、写库配置
1.2、读库配置
<bean id="readDataSource1" class="org.logicalcobwebs.proxool.ProxoolDataSource"> <property name="alias" value="readDataSource"/> <property name="driver" value="${read.connection.driver_class}" /> <property name="driverUrl" value="${read.connection.url}" /> <property name="user" value="${read.connection.username}" /> <property name="password" value="${read.connection.password}" /> <property name="maximumConnectionCount" value="${read.proxool.maximum.connection.count}"/> <property name="minimumConnectionCount" value="${read.proxool.minimum.connection.count}" /> <property name="statistics" value="${read.proxool.statistics}" /> <property name="simultaneousBuildThrottle" value="${read.proxool.simultaneous.build.throttle}"/> </bean>
1.3、读写动态库配置
通过writeDataSource指定写库,通过readDataSourceMap指定从库列表,从库列表默认通过顺序轮询来使用读库,具体参考javadoc;
<bean id="readWriteDataSource" class="cn.javass.common.datasource.ReadWriteDataSource"> <property name="writeDataSource" ref="writeDataSource"/> <property name="readDataSourceMap"> <map> <entry key="readDataSource1" value-ref="readDataSource1"/> <entry key="readDataSource2" value-ref="readDataSource1"/> <entry key="readDataSource3" value-ref="readDataSource1"/> <entry key="readDataSource4" value-ref="readDataSource1"/> </map> </property> </bean>
2、XML事务属性配置
所以读方法必须是read-only(必须,以此来判断是否是读方法)。
<tx:advice id="txAdvice" transaction-manager="txManager"> <tx:attributes> <tx:method name="save*" propagation="REQUIRED" /> <tx:method name="add*" propagation="REQUIRED" /> <tx:method name="create*" propagation="REQUIRED" /> <tx:method name="insert*" propagation="REQUIRED" /> <tx:method name="update*" propagation="REQUIRED" /> <tx:method name="merge*" propagation="REQUIRED" /> <tx:method name="del*" propagation="REQUIRED" /> <tx:method name="remove*" propagation="REQUIRED" /> <tx:method name="put*" read-only="true"/> <tx:method name="query*" read-only="true"/> <tx:method name="use*" read-only="true"/> <tx:method name="get*" read-only="true" /> <tx:method name="count*" read-only="true" /> <tx:method name="find*" read-only="true" /> <tx:method name="list*" read-only="true" /> <tx:method name="*" propagation="REQUIRED"/> </tx:attributes> </tx:advice>
3、事务管理器
事务管理器管理的是readWriteDataSource
<bean id="txManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="readWriteDataSource"/> </bean>
4、读/写动态数据库选择处理器
根据之前的txAdvice配置的事务属性决定是读/写,具体参考javadoc;
forceChoiceReadWhenWrite:用于确定在如果目前是写(即开启了事务),下一步如果是读,是直接参与到写库进行读,还是强制从读库读,具体参考javadoc;
<bean id="readWriteDataSourceTransactionProcessor" class="cn.javass.common.datasource.ReadWriteDataSourceProcessor"> <property name="forceChoiceReadWhenWrite" value="false"/> </bean>
5、事务切面和读/写库选择切面
<aop:config expose-proxy="true"> <!-- 只对业务逻辑层实施事务 --> <aop:pointcut id="txPointcut" expression="execution(* cn.javass..service..*.*(..))" /> <aop:advisor advice-ref="txAdvice" pointcut-ref="txPointcut"/> <!-- 通过AOP切面实现读/写库选择 --> <aop:aspect order="-2147483648" ref="readWriteDataSourceTransactionProcessor"> <aop:around pointcut-ref="txPointcut" method="determineReadOrWriteDB"/> </aop:aspect> </aop:config>
1、事务切面一般横切业务逻辑层;
2、此处我们使用readWriteDataSourceTransactionProcessor的通过AOP切面实现读/写库选择功能,order=Integer.MIN_VALUE(即最高的优先级),从而保证在操作事务之前已经决定了使用读/写库。
6、测试用例
只要配置好事务属性(通过read-only=true指定读方法)即可,其他选择读/写库的操作都交给readWriteDataSourceTransactionProcessor完成。
可以参考附件的:
cn.javass.readwrite.ReadWriteDBTestWithForceChoiceReadOnWriteFalse
cn.javass.readwrite.ReadWriteDBTestWithNoForceChoiceReadOnWriteTrue
package cn.javass.common.datasource; import java.sql.Connection; import java.sql.SQLException; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicInteger; import javax.sql.DataSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.jdbc.datasource.AbstractDataSource; import org.springframework.util.CollectionUtils; /** * * <pre> * 读/写动态选择数据库实现 * 目前实现功能 * 一写库多读库选择功能,请参考 * @see cn.javass.common.datasource.ReadWriteDataSourceDecision @see cn.javass.common.datasource.ReadWriteDataSourceDecision.DataSourceType * * 默认按顺序轮询使用读库 * 默认选择写库 * * 已实现:一写多读、当写时默认读操作到写库、当写时强制读操作到读库 * TODO 读库负载均衡、读库故障转移 * </pre> * @author Zhang Kaitao * */ public class ReadWriteDataSource extends AbstractDataSource implements InitializingBean { private static final Logger log = LoggerFactory.getLogger(ReadWriteDataSource.class); private DataSource writeDataSource; private Map<String, DataSource> readDataSourceMap; private String[] readDataSourceNames; private DataSource[] readDataSources; private int readDataSourceCount; private AtomicInteger counter = new AtomicInteger(1); /** * 设置读库(name, DataSource) * @param readDataSourceMap */ public void setReadDataSourceMap(Map<String, DataSource> readDataSourceMap) { this.readDataSourceMap = readDataSourceMap; } public void setWriteDataSource(DataSource writeDataSource) { this.writeDataSource = writeDataSource; } @Override public void afterPropertiesSet() throws Exception { if(writeDataSource == null) { throw new IllegalArgumentException("property 'writeDataSource' is required"); } if(CollectionUtils.isEmpty(readDataSourceMap)) { throw new IllegalArgumentException("property 'readDataSourceMap' is required"); } readDataSourceCount = readDataSourceMap.size(); readDataSources = new DataSource[readDataSourceCount]; readDataSourceNames = new String[readDataSourceCount]; int i = 0; for(Entry<String, DataSource> e : readDataSourceMap.entrySet()) { readDataSources[i] = e.getValue(); readDataSourceNames[i] = e.getKey(); i++; } } private DataSource determineDataSource() { if(ReadWriteDataSourceDecision.isChoiceWrite()) { log.debug("current determine write datasource"); return writeDataSource; } if(ReadWriteDataSourceDecision.isChoiceNone()) { log.debug("no choice read/write, default determine write datasource"); return writeDataSource; } return determineReadDataSource(); } private DataSource determineReadDataSource() { //按照顺序选择读库 //TODO 算法改进 int index = counter.incrementAndGet() % readDataSourceCount; if(index < 0) { index = - index; } String dataSourceName = readDataSourceNames[index]; log.debug("current determine read datasource : {}", dataSourceName); return readDataSources[index]; } @Override public Connection getConnection() throws SQLException { return determineDataSource().getConnection(); } @Override public Connection getConnection(String username, String password) throws SQLException { return determineDataSource().getConnection(username, password); } }
package cn.javass.common.datasource; /** * <pre> * 读/写动态数据库 决策者 * 根据DataSourceType是write/read 来决定是使用读/写数据库 * 通过ThreadLocal绑定实现选择功能 * </pre> * @author Zhang Kaitao * */ public class ReadWriteDataSourceDecision { public enum DataSourceType { write, read; } private static final ThreadLocal<DataSourceType> holder = new ThreadLocal<DataSourceType>(); public static void markWrite() { holder.set(DataSourceType.write); } public static void markRead() { holder.set(DataSourceType.read); } public static void reset() { holder.set(null); } public static boolean isChoiceNone() { return null == holder.get(); } public static boolean isChoiceWrite() { return DataSourceType.write == holder.get(); } public static boolean isChoiceRead() { return DataSourceType.read == holder.get(); } }
package cn.javass.common.datasource; import java.lang.reflect.Field; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.core.NestedRuntimeException; import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.interceptor.NameMatchTransactionAttributeSource; import org.springframework.transaction.interceptor.RuleBasedTransactionAttribute; import org.springframework.transaction.interceptor.TransactionAttribute; import org.springframework.transaction.interceptor.TransactionInterceptor; import org.springframework.util.PatternMatchUtils; import org.springframework.util.ReflectionUtils; /** * * * <pre> * * 此类实现了两个职责(为了减少类的数量将两个功能合并到一起了): * 读/写动态数据库选择处理器 * 通过AOP切面实现读/写选择 * * * ★★读/写动态数据库选择处理器★★ * 1、首先读取<tx:advice>事务属性配置 * * 2、对于所有读方法设置 read-only="true" 表示读取操作(以此来判断是选择读还是写库),其他操作都是走写库 * 如<tx:method name="×××" read-only="true"/> * * 3、 forceChoiceReadOnWrite用于确定在如果目前是写(即开启了事务),下一步如果是读, * 是直接参与到写库进行读,还是强制从读库读<br/> * forceChoiceReadOnWrite:true 表示目前是写,下一步如果是读,强制参与到写事务(即从写库读) * 这样可以避免写的时候从读库读不到数据 * * 通过设置事务传播行为:SUPPORTS实现 * * forceChoiceReadOnWrite:false 表示不管当前事务是写/读,都强制从读库获取数据 * 通过设置事务传播行为:NOT_SUPPORTS实现(连接是尽快释放) * 『此处借助了 NOT_SUPPORTS会挂起之前的事务进行操作 然后再恢复之前事务完成的』 * 4、配置方式 * <bean id="readWriteDataSourceTransactionProcessor" class="cn.javass.common.datasource.ReadWriteDataSourceProcessor"> * <property name="forceChoiceReadWhenWrite" value="false"/> * </bean> * * 5、目前只适用于<tx:advice>情况 TODO 支持@Transactional注解事务 * * * * ★★通过AOP切面实现读/写库选择★★ * * 1、首先将当前方法 与 根据之前【读/写动态数据库选择处理器】 提取的读库方法 进行匹配 * * 2、如果匹配,说明是读取数据: * 2.1、如果forceChoiceReadOnWrite:true,即强制走读库 * 2.2、如果之前是写操作且forceChoiceReadOnWrite:false,将从写库进行读取 * 2.3、否则,到读库进行读取数据 * * 3、如果不匹配,说明默认将使用写库进行操作 * * 4、配置方式 * <aop:aspect order="-2147483648" ref="readWriteDataSourceTransactionProcessor"> * <aop:around pointcut-ref="txPointcut" method="determineReadOrWriteDB"/> * </aop:aspect> * 4.1、此处order = Integer.MIN_VALUE 即最高的优先级(请参考http://jinnianshilongnian.iteye.com/blog/1423489) * 4.2、切入点:txPointcut 和 实施事务的切入点一样 * 4.3、determineReadOrWriteDB方法用于决策是走读/写库的,请参考 * @see cn.javass.common.datasource.ReadWriteDataSourceDecision * @see cn.javass.common.datasource.ReadWriteDataSource * * </pre> * @author Zhang Kaitao * */ public class ReadWriteDataSourceProcessor implements BeanPostProcessor { private static final Logger log = LoggerFactory.getLogger(ReadWriteDataSourceProcessor.class); private boolean forceChoiceReadWhenWrite = false; private Map<String, Boolean> readMethodMap = new HashMap<String, Boolean>(); /** * 当之前操作是写的时候,是否强制从从库读 * 默认(false) 当之前操作是写,默认强制从写库读 * @param forceReadOnWrite */ public void setForceChoiceReadWhenWrite(boolean forceChoiceReadWhenWrite) { this.forceChoiceReadWhenWrite = forceChoiceReadWhenWrite; } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if(!(bean instanceof NameMatchTransactionAttributeSource)) { return bean; } try { NameMatchTransactionAttributeSource transactionAttributeSource = (NameMatchTransactionAttributeSource)bean; Field nameMapField = ReflectionUtils.findField(NameMatchTransactionAttributeSource.class, "nameMap"); nameMapField.setAccessible(true); Map<String, TransactionAttribute> nameMap = (Map<String, TransactionAttribute>) nameMapField.get(transactionAttributeSource); for(Entry<String, TransactionAttribute> entry : nameMap.entrySet()) { RuleBasedTransactionAttribute attr = (RuleBasedTransactionAttribute)entry.getValue(); //仅对read-only的处理 if(!attr.isReadOnly()) { continue; } String methodName = entry.getKey(); Boolean isForceChoiceRead = Boolean.FALSE; if(forceChoiceReadWhenWrite) { //不管之前操作是写,默认强制从读库读 (设置为NOT_SUPPORTED即可) //NOT_SUPPORTED会挂起之前的事务 attr.setPropagationBehavior(Propagation.NOT_SUPPORTED.value()); isForceChoiceRead = Boolean.TRUE; } else { //否则 设置为SUPPORTS(这样可以参与到写事务) attr.setPropagationBehavior(Propagation.SUPPORTS.value()); } log.debug("read/write transaction process method:{} force read:{}", methodName, isForceChoiceRead); readMethodMap.put(methodName, isForceChoiceRead); } } catch (Exception e) { throw new ReadWriteDataSourceTransactionException("process read/write transaction error", e); } return bean; } @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { return bean; } private class ReadWriteDataSourceTransactionException extends NestedRuntimeException { public ReadWriteDataSourceTransactionException(String message, Throwable cause) { super(message, cause); } } public Object determineReadOrWriteDB(ProceedingJoinPoint pjp) throws Throwable { if (isChoiceReadDB(pjp.getSignature().getName())) { ReadWriteDataSourceDecision.markRead(); } else { ReadWriteDataSourceDecision.markWrite(); } try { return pjp.proceed(); } finally { ReadWriteDataSourceDecision.reset(); } } private boolean isChoiceReadDB(String methodName) { String bestNameMatch = null; for (String mappedName : this.readMethodMap.keySet()) { if (isMatch(methodName, mappedName)) { bestNameMatch = mappedName; break; } } Boolean isForceChoiceRead = readMethodMap.get(bestNameMatch); //表示强制选择 读 库 if(isForceChoiceRead == Boolean.TRUE) { return true; } //如果之前选择了写库 现在还选择 写库 if(ReadWriteDataSourceDecision.isChoiceWrite()) { return false; } //表示应该选择读库 if(isForceChoiceRead != null) { return true; } //默认选择 写库 return false; } protected boolean isMatch(String methodName, String mappedName) { return PatternMatchUtils.simpleMatch(mappedName, methodName); } }
相关文章推荐
- Spring 事情具体详尽的解释
- Struts2-配置struts.xml
- spring 泛型依赖注入
- POJO和javaBean的区别
- Java 中深层理解父类引用指向子类对象
- 6 Spring入门 DispatcherServlet的工作方式
- Java中,父类引用指向子类对象的问题分析
- struts总结
- Java 命名规范
- Java回合阵列List
- [java学习笔记]java语言核心----面向对象之static关键字
- Java多线程21:多线程下的其他组件之CyclicBarrier、Callable、Future和FutureTask
- [Java] Java中带标签的break语句
- eclipse java工程和maven工程的互相转换
- switch增强_九九乘法表_goto_键盘的输入JAVA019-027
- 学习Java的第一天
- java Proxy(代理机制)
- Struts2 重定向
- Java随机数生成原理
- java反射与动态代理