您的位置:首页 > 编程语言 > Java开发

spring配置数据库负载均衡

2016-04-03 17:47 561 查看
package com.hengyu.ticket.common;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* Created by lgf on 2016/4/3.
* 使用多台数据库服务区器负载
*/
public class CommonLoadBalancing {

private static final String SERVER_CONFIG = "SERVER.JSON";
public static String DB_DRIVER;
public static String DB_PORT;
public static String DB_USER_NAME;
public static String DB_PASSWORD;
public static String DB_URL;

//选择服务器类型(线程安全)
private static final ThreadLocal<String> SERVER_HOLDER = new ThreadLocal<>();

//服务器类型
private final static String SERVER_MASTER = "MASTER";
private final static String SERVER_SLAVE = "SLAVE";

//当前服务器的编号
private static Integer CURRENT_INDEX = -1;
//当前权重
private static Integer CURRENT_WEIGHT = 0;
//最大权重
private static Integer MAX_WEIGHT = 0;
//权重公约数
private static Integer GCD_WEIGHT = 1;
//最小公约数
private static Integer MIN_WEIGHT = 1;
//主服务器
private static Server MASTER;
//备用主服务器
private static Server MASTER_RESERVED;
//服务器集合
public static final List<Server> servers = Collections.synchronizedList(new ArrayList<Server>());
//脱机服务器集合
private static final List<Server> down_servers = Collections.synchronizedList(new ArrayList<Server>());

static {
DBConfig.init();
}

//获取主服务器,备用主服务器
public static Server getMaster() {
if (MASTER.isDown()) {
if (MASTER_RESERVED != null && !MASTER_RESERVED.isDown()) {
return MASTER_RESERVED;
} else {
throw new RuntimeException("error:tow master is down!");
}
}
return MASTER;
}

public static void setMaster(Server master) {
CommonLoadBalancing.MASTER = master;
}

//添加服务器
public static void addServer(Server server) {
addServer(server, false);
}

//添加服务器
public static void addServer(Server server, boolean isReload) {
if (server == null) {
throw new RuntimeException("error: server cant't not be null !");
}
int index = servers.size();
if (server.getType() != null && SERVER_MASTER.equals(server.getType())) {
MASTER = server;
} else if (server.getType() == null) {
server.setType(SERVER_SLAVE);
}
servers.add(server);
if (isReload) {
initOrReload();
}
}

//添加服务器
public static void addServer(List<Server> servers) {
for (int i = 0; i < servers.size(); i++) {
Server server = servers.get(i);
addServer(server);
}
CommonLoadBalancing.initOrReload();
}

//查找权重,计算权重公约数

public synchronized static void initOrReload() {
for (Server server : servers) {
if (server == null || server.isDown()) {
continue;
}
if (server.getWeight() > MAX_WEIGHT) {
MAX_WEIGHT = server.getWeight();
}
if (server.getWeight() < MIN_WEIGHT) {
MIN_WEIGHT = server.getWeight();
}
}
if (MASTER == null) {
MASTER = servers.get(0);
MASTER.setType(SERVER_MASTER);
}
GCD_WEIGHT = gcd(servers);
}

//获取权重公因数
public static int gcd(List<Server> servers) {
if (servers == null || servers.size() == 0) {
return 1;
}
int min = servers.get(0).getWeight();

for (int i = 0; i < servers.size(); i++) {
Server server = servers.get(i);
if (server.getWeight() < min) {
min = server.getWeight();
}
}
while (min >= 1) {
boolean isCommon = true;
for (int i = 0; i < servers.size(); i++) {
Server server = servers.get(i);
if (server.getWeight() % min != 0) {
isCommon = false;
break;
}
}
if (isCommon) {
break;
}
min--;
}
return min<1?1:min;
}

//轮询服务器
public static Server getServer() {
int count = 0;
int size = servers.size();
if (size == 0 || getServerType().equals(SERVER_MASTER)) {
return getMaster().addAccessCount();
}
clearServerHolder();
while (true) {
CURRENT_INDEX = (CURRENT_INDEX + 1) % size;
if (CURRENT_INDEX == 0) {
CURRENT_WEIGHT = CURRENT_WEIGHT - GCD_WEIGHT;
if (CURRENT_WEIGHT <= 0) {
CURRENT_WEIGHT = MAX_WEIGHT;
}
}
Server server = servers.get(CURRENT_INDEX);
if (server != null && server.getWeight() >= CURRENT_WEIGHT && !server.isDown) {
return server.addAccessCount();
}
if (count >= size) {
return getMaster().addAccessCount();
}
count++;
}
}

public static List<Server> getServers() {
return servers;
}

public static void setMasterReserved(Server masterReserved) {
CommonLoadBalancing.MASTER_RESERVED = masterReserved;
}

public static List<Server> getDown_servers() {
return down_servers;
}

public static void setServerType(boolean isReadOnly) {
if (isReadOnly) {
SERVER_HOLDER.set(SERVER_SLAVE);
} else {
SERVER_HOLDER.set(SERVER_MASTER);
}
}

public static void clearServerHolder() {
SERVER_HOLDER.set(SERVER_SLAVE);
}

public static String getServerType() {
String type = SERVER_HOLDER.get();
if (type == null || type.equals(SERVER_SLAVE)) {
return SERVER_SLAVE;
}
return SERVER_MASTER;
}

public static class Server {
//服务器编号
private String id;
//服务器索引
private Integer index;
///服务器ip
private String ip;
//权重
private int weight;
//类型,主从
private String type;
//用户名
private String username;
//密码
private String password;
//端口
private String port;
//url链接
private String url;
//访问数量
private Integer accessCount = 0;
//是否脱机
private boolean isDown;

public Server() {

}

@Override
public String toString() {
return "Server{type='" + type + '\'' + ", ip='" + ip + '\'' + ", weight=" + weight + ", accessCount=" + accessCount + '}';
}

public String getUsername() {
return username;
}

public void setUsername(String username) {
this.username = username;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}

public String getPort() {
return port;
}

public void setPort(String port) {
this.port = port;
}

public String getUrl() {
return url;
}

public void setUrl(String url) {
this.url = url;
}

public Integer getIndex() {
return index;
}

public void setIndex(Integer index) {
this.index = index;
}

public Server(String ip, int weight) {
this.ip = ip;
this.weight = weight;
}

public int getAccessCount() {
return accessCount;
}

public void setAccessCount(int accessCount) {
this.accessCount = accessCount;
}

public Server addAccessCount() {
synchronized (this.accessCount) {
this.accessCount++;
return this;
}
}

public String getType() {
return type;
}

public void setType(String type) {
this.type = type;
}

public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

public String getIp() {
return ip;
}

public void setIp(String ip) {
this.ip = ip;
}

public int getWeight() {
if (weight < 1) {
weight = 1;
}
return weight;
}

public void setWeight(int weight) {
if (weight < 1) {
weight = 1;
}
this.weight = weight;
}

public boolean isDown() {
return isDown;
}

public void setDown(boolean down) {
isDown = down;
}
}

//数据库配置
public static class DBConfig {
/*
* { "username": "root", "password": "admin", "port": "3306", "driver": "com.mysql.jdbc.Driver"
* , "url": "jdbc:mysql://${host}:${prot}/ticket?characterEncoding=utf8", "master": [ "127.0.0.1" ],
* "slave": [ "192.168.0.10 -w100", "192.168.0.11 -w50" ] }
*
* */

//初始化
public static void init() {
InputStream in = null;
try {
in = Thread.currentThread().getContextClassLoader().getResourceAsStream(SERVER_CONFIG);
if (in == null) {
throw new RuntimeException("error:" + SERVER_CONFIG + " cat't not be null");
}
byte[] bs = new byte[in.available()];
in.read(bs);
JSONObject base = JSON.parseObject(new String(bs));
DB_USER_NAME = base.get("username") == null ? "" : base.get("username").toString();
DB_PASSWORD = base.get("password") == null ? "" : base.get("password").toString();
DB_URL = base.get("url") == null ? "" : base.get("url").toString();
DB_DRIVER = base.get("driver") == null ? "" : base.get("driver").toString();
DB_PORT = base.get("port") == null ? "" : base.get("port").toString();
List<String> masters = JSON.parseArray(base.get("master").toString(), String.class);
List<String> slaves = null;
if (base.get("slave") != null) {
slaves = JSON.parseArray(base.get("slave").toString(), String.class);
}
createServer(masters, slaves);
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (in != null) {
in.close();
}
} catch (IOException e) {
}
}
}

//创建服务器
public synchronized static void createServer(List<String> masters, List<String> slave) {
List<Server> servers = new ArrayList<>();
int i = 0;
if (masters != null) {
//主服务器
for (String info : masters) {
Server server = new Server();
server.setId(String.valueOf(i));
createServer(SERVER_MASTER, server, info);
server.setIndex(servers.size());
servers.add(server);
CommonLoadBalancing.MASTER = server;
if (i == 1) {
CommonLoadBalancing.MASTER_RESERVED = server;
}
i++;
}
}

//从服务器
if (slave != null) {
for (String info : slave) {
Server sa = new Server();
sa.setId(String.valueOf(i));
createServer(SERVER_SLAVE, sa, info);
sa.setIndex(i);
servers.add(sa);
i++;
}
}
CommonLoadBalancing.addServer(servers);
}

//创建服务器
public static void createServer(String type, Server server, String info) {
server.setUrl(DB_URL);
server.setPort(DB_PORT);
server.setUsername(DB_USER_NAME);
server.setPassword(DB_PASSWORD);
server.setType(type);
String[] array = info.trim().split("\\s");
int i = 0;
for (String item : array) {
if (item.startsWith("-w")) {
server.setWeight(Integer.parseInt(getConfigString(item, array, i)));
} else if (item.startsWith("-u")) {
server.setUsername(getConfigString(item, array, i));
} else if (item.startsWith("-p")) {
server.setPassword(getConfigString(item, array, i));
} else if (item.startsWith("-P")) {
server.setPort(getConfigString(item, array, i));
} else if (item.startsWith("-U")) {
server.setUrl(getConfigString(item, array, i));
} else if (item.startsWith("-h")) {
server.setIp(getConfigString(item, array, i));
} else if (item.startsWith("-i")) {
server.setId(getConfigString(item, array, i));
} else if (i == 0) {
server.setIp(item);
}
i++;
}
String _url = server.getUrl();
server.setUrl(_url.replace("${host}", server.getIp()).replace("${port}", server.getPort()));
}

public static String getConfigString(String item, String[] array, int i) {
return (item.length() == 2 ? array[i + 1] : item.substring(2)).trim();
}

}

}

 //数据源配置

package com.hengyu.ticket.common;

import org.springframework.jdbc.datasource.DriverManagerDataSource;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Created by lgf on 2016/4/3.
*/
public class DynamicDataSource extends AbstractRoutingDataSource {

private static Map<Object, Object> targetDataSources = new HashMap<>();

//初始化数据源
public static void initDataSource(){
try {
List<CommonLoadBalancing.Server> servers = CommonLoadBalancing.getServers() ;
for (CommonLoadBalancing.Server serv
1bb8b
er:servers){
DriverManagerDataSource ds = new DriverManagerDataSource();
ds.setUsername(server.getUsername());
ds.setPassword(server.getPassword());
ds.setDriverClassName(CommonLoadBalancing.DB_DRIVER);
ds.setUrl(server.getUrl());
targetDataSources.put(String.valueOf(server.getId()),ds);
}
}catch (Exception e){
e.printStackTrace();
}
}

static {
initDataSource();
}

public DynamicDataSource() throws IOException {
super.setTargetDataSources(targetDataSources);
}

@Override
protected Object determineCurrentLookupKey() {
CommonLoadBalancing.Server server = CommonLoadBalancing.getServer();
System.out.println("=====>>获取数据源:" +server);
return server.getIndex().toString();
}

public Map<Object, Object> getTargetDataSources() {
return targetDataSources;
}

@Override
public void setTargetDataSources(Map<Object, Object> targetDataSources) {
this.targetDataSources = targetDataSources;
}

}

 //spring配置

 

package com.hengyu.ticket.common;

import org.aopalliance.intercept.MethodInvocation;
import org.aspectj.lang.JoinPoint;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.interceptor.TransactionAttribute;
import org.springframework.transaction.interceptor.TransactionInterceptor;
import org.springframework.web.filter.OncePerRequestFilter;

import java.lang.reflect.Method;

/**
* Created by lgf on 2016/4/4.
*/
public class MethodInterceptor implements org.aopalliance.intercept.MethodInterceptor {

@Autowired
private TransactionInterceptor transactionInterceptor;

@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
Class targetClass = invocation.getThis() != null? AopUtils.getTargetClass(invocation.getThis()):null;
final TransactionAttribute txAttr = transactionInterceptor.getTransactionAttributeSource().getTransactionAttribute(invocation.getMethod(), targetClass);
System.out.println("*******"+txAttr.isReadOnly());
CommonLoadBalancing.setServerType(txAttr.isReadOnly());
Object proceed = invocation.proceed();
return proceed;
}
}

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.2.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop-3.2.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.2.xsd">

<!-- 支持注解 -->
<mvc:annotation-driven />

<!-- 扫描控制器类 -->
<context:component-scan base-package="com.hengyu.ticket.service" />

<!-- 加载配置文件 -->
<bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>classpath:config.properties</value>
</list>
</property>
</bean>

<bean id="dynamicDataSource" class="com.hengyu.ticket.common.DynamicDataSource">
</bean>

<!--  配置 sqlSessionFactory -->
<bean id="sqlSessionFactory" name="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
<property name="dataSource" ref="dynamicDataSource"/>
<property name="plugins">
<array>
<bean class="com.hengyu.ticket.common.SqlIntercept">
<property name="show_sql" value="false"></property>
</bean>
</array>
</property>
<property name="configurationProperties">
<props>
<!-- 				<prop key="cacheEnabled">true</prop> -->
</props>
</property>
<property name="mapperLocations" value="com.hengyu.ticket.dao.*.xml"/>
<property name="typeAliasesPackage" value="com.hengyu.ticket.entity"/>
</bean>

<!-- 配置 mapperScannerConfigurer 扫描配置文件 -->
<bean id="mapperScannerConfigurer" class="org.mybatis.spring.mapper.MapperScannerConfigurer">
<property name="sqlSessionFactoryBeanName" value="sqlSessionFactory"/>
<property name="basePackage" value="com.hengyu.ticket.dao"/>
<!-- 		<property name="sqlSessionTemplateBeanName" value="SqlSessionTemplate"/> -->
</bean>

<!-- 事务管理器 -->
<bean id="txManager"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dynamicDataSource"></property>
</bean>

<!-- 事物切面 -->
<tx:advice id="txAdvice" transaction-manager="txManager">
<tx:attributes >
<tx:method name="insert*" propagation="REQUIRED" />
<tx:method name="add*" propagation="REQUIRED" />
<tx:method name="save*" propagation="REQUIRED" />
<tx:method name="update*" propagation="REQUIRED" />
<tx:method name="edit*" propagation="REQUIRED" />
<tx:method name="del*" propagation="REQUIRED" />
<tx:method name="remove*" propagation="REQUIRED" />
<tx:method name="get*" propagation="SUPPORTS" read-only="true" />
<tx:method name="find*" propagation="SUPPORTS" read-only="true" />
<tx:method name="select*" propagation="SUPPORTS" read-only="true" />
<tx:method name="load*" propagation="SUPPORTS" read-only="true" />
<tx:method name="*" />
</tx:attributes>
</tx:advice>

<aop:config>
<aop:advisor pointcut="execution(* com.hengyu.ticket.service.*.*(..))"
advice-ref="txAdvice"/>
<aop:advisor pointcut="execution(* com.hengyu.ticket.service.*.*(..))"
advice-ref="methodInterceptor" order="1"/>
<aop:aspect id="im" ref="methodInterceptor">
<aop:pointcut id="mi" expression="execution(* com.hengyu.ticket.service.*.*(..))"></aop:pointcut>
</aop:aspect>
</aop:config>

<bean id="methodInterceptor" class="com.hengyu.ticket.common.MethodInterceptor"></bean>
</beans>

 //数据库配置SERVER.JSON

 

{
"username": "root",
"password": "admin",
"port": "3306",
"driver": "com.mysql.jdbc.Driver",
"url": "jdbc:mysql://${host}:${port}/ticket?characterEncoding=utf8",
"master": [
"127.0.0.1"
],
"slave": [
"localhost"
]
}

 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: