Redis 主从复制、读写分离、高可用(七)-part 3
2017-06-26 16:29
761 查看
这里我用docker实现:
结构:
配置文件和前面改动一样,config 要注意的是:
6379的配置改动的地方:
sentinel.conf
准备工作完成~~~~
ps:启动各个redis,和sentinel,记得把 主从关系搭建好!
java:代码 贴一些关键的(读写分离,自动读取或写入 最新的redis,和发现最新的从redis):
properties:
核心代码(利用前面学过的):
代码包:地址
使用:
结构:
[root@14ba6e5d00c2 redis]# ls redis-3.2.8 redis-6379 redis-6380 redis-6381
配置文件和前面改动一样,config 要注意的是:
#bind 127.0.0.1 bind 0.0.0.0
6379的配置改动的地方:
bind 0.0.0.0 port 6379 daemonize yes pidfile "/sys/fs/cgroup/redis_6379.pid" logfile "/sys/fs/cgroup/redis_6379.log" dbfilename "dump6379.rdb" dir "/sys/fs/cgroup" appendonly no appendfilename "appendonly.aof"
sentinel.conf
sentinel monitor mymaster 127.0.0.1 6379 1
准备工作完成~~~~
ps:启动各个redis,和sentinel,记得把 主从关系搭建好!
java:代码 贴一些关键的(读写分离,自动读取或写入 最新的redis,和发现最新的从redis):
import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import redis.clients.jedis.JedisPoolConfig; import cn.com.qiqi.springMvcTest.core.JedisSentinelPoolExt; import cn.com.qiqi.springMvcTest.core.RedisDataSource; import cn.com.qiqi.springMvcTest.core.RedisDataSourceImpl; @Configuration @PropertySource(value="classpath:/redis.properties") public class RedisConfig { @Value("${redis.masterName}") private String masterName; @Value("${redis.sentinals}") private String sentinal_hostAndPorts; @Bean(name="redisDataSource") public RedisDataSource redisDataSource(){ JedisPoolConfig poolConfig=new JedisPoolConfig(); poolConfig.setMaxTotal(1000); poolConfig.setMaxIdle(32); poolConfig.setTestOnBorrow(true); poolConfig.setMaxWaitMillis(100*1000); System.out.println(masterName+":"+sentinal_hostAndPorts); JedisSentinelPoolExt jedisSentinelPoolExt=new JedisSentinelPoolExt(masterName, sentinal_hostAndPorts, poolConfig, 100, 200, 10, true); RedisDataSourceImpl dataSourceImpl =new RedisDataSourceImpl(); dataSourceImpl.setJedisSentinelPool(jedisSentinelPoolExt); return dataSourceImpl; } }
properties:
#redis #masterName redis.masterName=mymaster #sentinal-HostAndPort redis.sentinals=192.168.1.107:26379
核心代码(利用前面学过的):
import org.apache.commons.lang3.StringUtils; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPubSub; import redis.clients.jedis.Protocol; import redis.clients.jedis.exceptions.JedisConnectionException; import redis.clients.jedis.exceptions.JedisException; import redis.clients.util.Pool; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; /** * 高级哨兵池 */ public class JedisSentinelPoolExt extends Pool<Jedis> { protected GenericObjectPoolConfig poolConfig; protected int connectionTimeout = Protocol.DEFAULT_TIMEOUT; protected int soTimeout = Protocol.DEFAULT_TIMEOUT; protected String password; protected int database = Protocol.DEFAULT_DATABASE; protected String clientName; protected Set<MasterListener> masterListeners = new HashSet<MasterListener>(); protected Set<SlaveDownListener> slaveDownListeners = new HashSet<SlaveDownListener>(); protected Set<SlaveUpListener> slaveUpListeners = new HashSet<SlaveUpListener>(); protected List<HostAndPort> readHaps = new ArrayList<HostAndPort>(); protected List<JedisPoolExt> jedisReadPools = new CopyOnWriteArrayList<JedisPoolExt>(); protected Set<String> _sentinels = new LinkedHashSet<String>(); protected Map<HostAndPort, Integer> failMap = new ConcurrentHashMap<HostAndPort, Integer>(); private String masterName; private int failMax; protected boolean masterWriteOnly; protected long subscribeRetryWaitTimeMillis = 5000; protected Logger log = Logger.getLogger(getClass().getName()); private volatile JedisFactory factory; private volatile HostAndPort currentHostMaster; public JedisSentinelPoolExt(String masterName, String sentinels, final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, int failMax, boolean masterWriteOnly) { this(masterName, sentinels, poolConfig, null, connectionTimeout, soTimeout, failMax, masterWriteOnly, null); } public JedisSentinelPoolExt(String masterName, String sentinels, final GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout, int failMax, boolean masterWriteOnly, String clientName) { this(masterName, sentinels, poolConfig, null, connectionTimeout, soTimeout, failMax, masterWriteOnly, clientName); } public JedisSentinelPoolExt(String masterName, String sentinels, final GenericObjectPoolConfig poolConfig, final String password, int connectionTimeout, int soTimeout, int failMax, boolean masterWriteOnly) { this(masterName, sentinels, poolConfig, password, connectionTimeout, soTimeout, failMax, masterWriteOnly, null); } public JedisSentinelPoolExt(String masterName, String sentinels, final GenericObjectPoolConfig poolConfig, final String password, int connectionTimeout, int soTimeout, int failMax, boolean masterWriteOnly, String clientName) { String[] strings = sentinels.split(","); Collections.addAll(_sentinels, strings); this.poolConfig = poolConfig; this.connectionTimeout = connectionTimeout; this.soTimeout = soTimeout; if (!StringUtils.isBlank(password)) { this.password = password; } this.database = Protocol.DEFAULT_DATABASE; this.failMax = failMax; this.masterName = masterName; this.masterWriteOnly = masterWriteOnly; if (!StringUtils.isBlank(clientName)) { this.clientName = clientName; } HostAndPort master = initSentinels(_sentinels, masterName); initPool(master); initReadPool(); } private void initReadPool() { updateReadPools(readHaps); } /** * 获取写实例 * @return */ public WriteJedis getWriteResource() { return new WriteJedis(getResource()); } /** * 获取读实例 * @return */ public ReadJedis getReadResource() { if (jedisReadPools.size() == 0) { throw new RedisException("there is no jedis for read"); } // 从redis读池中随机获取一个实例 Random rand = new Random(); int randNum = rand.nextInt(jedisReadPools.size()); JedisPoolExt jedisPoolExt = jedisReadPools.get(randNum); HostAndPort hostAndPort = jedisPoolExt.getHostAndPort(); JedisPool jedisPool = jedisPoolExt.getJedisPool(); ReadJedis readJedis; try { Jedis jedis = jedisPool.getResource(); readJedis = new ReadJedis(jedis, jedisPool); } catch (Throwable e) { // 添加失败列表 addFailMap(hostAndPort, jedisPool); // 重试其它连接池 if (jedisReadPools.size() > 1) { while (true) { // 随机获取另一个读节点,仍然连接失败就抛异常 int randNum1 = rand.nextInt(jedisReadPools.size()); if (randNum1 != randNum) { JedisPoolExt jedisPoolExt1 = jedisReadPools.get(randNum1); JedisPool jedisPool1 = jedisPoolExt1.getJedisPool(); try { Jedis jedis = jedisPool1.getResource(); readJedis = new ReadJedis(jedis, jedisPool1); } catch (Exception e1) { addFailMap(hostAndPort, jedisPool); throw new RedisException("JedisSentinelPoolExt getReadResource retry error", e1); } return readJedis; } } } else { throw new RedisException(e); } } return readJedis; } /** * 连接失败的实例,放入失败记录列表,超过指定失败次数的实例,会从读节点列表中丢弃 * @param hostAndPort * @param jedisPool */ private void addFailMap(HostAndPort hostAndPort, JedisPool jedisPool) { if (failMax != 0 && jedisReadPools.size() > 1) { Integer failTimes = failMap.get(hostAndPort); if (failTimes == null) { failTimes = 1; } else { failTimes++; } if (failTimes >= failMax) { failMap.remove(hostAndPort); jedisPool.close(); removeFromReadPool(hostAndPort); } else { failMap.put(hostAndPort, failTimes); } } } public void resetReadPool() { initSentinels(_sentinels, masterName); initReadPool(); failMap.clear(); } public void destroy() { for (MasterListener m : masterListeners) { m.shutdown(); } for (SlaveDownListener m : slaveDownListeners) { m.shutdown(); } for (SlaveUpListener m : slaveUpListeners) { m.shutdown(); } super.destroy(); } public HostAndPort getCurrentHostMaster() { return currentHostMaster; } protected void initPool(HostAndPort master) { if (!master.equals(currentHostMaster)) { currentHostMaster = master; if (factory == null) { factory = new JedisFactory(master.getHost(), master.getPort(), connectionTimeout, soTimeout, password, database, clientName); initPool(poolConfig, factory); } else { factory.setHostAndPort(currentHostMaster); // although we clear the pool, we still have to check the // returned object // in getResource, this call only clears idle instances, not // borrowed instances internalPool.clear(); } log.info("Created JedisPool to master at " + master); } } protected HostAndPort initSentinels(Set<String> sentinels, final String masterName) { HostAndPort master = null; boolean sentinelAvailable = false; log.info("Trying to find master from available Sentinels..."); for (String sentinel : sentinels) { final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":"))); log.fine("Connecting to Sentinel " + hap); Jedis jedis = null; try { jedis = new Jedis(hap.getHost(), hap.getPort()); List<String> masterAddr = jedis.sentinelGetMasterAddrByName(masterName); // connected to sentinel... sentinelAvailable = true; if (masterAddr == null || masterAddr.size() != 2) { log.warning("Can not get master addr, master name: " + masterName + ". Sentinel: " + hap + "."); continue; } master = toHostAndPort(masterAddr); log.fine("Found Redis master at " + master); initReadHaps(jedis, masterName, master); break; } catch (JedisConnectionException e) { log.warning("Cannot connect to sentinel running @ " + hap + ". Trying next one."); } finally { if (jedis != null) { jedis.close(); } } } if (master == null) { if (sentinelAvailable) { // can connect to sentinel, but master name seems to not // monitored throw new JedisException("Can connect to sentinel, but " + masterName + " seems to be not monitored..."); } else { throw new JedisConnectionException("All sentinels down, cannot determine where is " + masterName + " master is running..."); } } log.info("Redis master running at " + master + ", starting Sentinel listeners..."); for (MasterListener masterListener : masterListeners) { masterListener.shutdown(); } masterListeners.clear(); for (SlaveDownListener slaveDownListener : slaveDownListeners) { slaveDownListener.shutdown(); } slaveDownListeners.clear(); for (SlaveUpListener slaveUpListener : slaveUpListeners) { slaveUpListener.shutdown(); } slaveUpListeners.clear(); for (String sentinel : sentinels) { final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":"))); MasterListener masterListener = new MasterListener(masterName, hap.getHost(), hap.getPort()); masterListeners.add(masterListener); masterListener.start(); SlaveDownListener slaveDownListener = new SlaveDownListener(masterName, hap.getHost(), hap.getPort()); slaveDownListeners.add(slaveDownListener); slaveDownListener.start(); SlaveUpListener slaveUpListener = new SlaveUpListener(masterName, hap.getHost(), hap.getPort()); slaveUpListeners.add(slaveUpListener); slaveUpListener.start(); } return master; } private void initReadHaps(Jedis jedis, String masterName, HostAndPort master) { List<Map<String, String>> slaveList = jedis.sentinelSlaves(masterName); initReadHaps(slaveList, master); } /** * 初始化读池 * @param slaveList * @param master * @return */ private List<HostAndPort> initReadHaps(List<Map<String, String>> slaveList, HostAndPort master) { // master允许读时,才放入读池 if (!masterWriteOnly) { readHaps = createReadHaps(slaveList, master); } else { readHaps = createReadHaps(slaveList); } return readHaps; } /** * 将slave节点作为读节点放入读池 * @param slaveList * @return */ private List<HostAndPort> createReadHaps(List<Map<String, String>> slaveList) { List<HostAndPort> haps = new ArrayList<>(); for (Map<String, String> slave : slaveList) { HostAndPort hap = new HostAndPort(slave.get("ip"), Integer.valueOf(slave.get("port"))); haps.add(hap); } return haps; } private List<HostAndPort> createReadHaps(List<Map<String, String>> slaveList, HostAndPort master) { List<HostAndPort> haps = createReadHaps(slaveList); haps.add(master); return haps; } /** * 读池增加新节点 * @param hostAndPort */ protected synchronized void insertReadPool(HostAndPort hostAndPort) { boolean exists = false; for (JedisPoolExt oldJedisPool : jedisReadPools) { if (oldJedisPool.getHostAndPort().equals(hostAndPort)) { exists = true; break; } } if (!exists) { JedisPoolExt newJedisPool = new JedisPoolExt(poolConfig, hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout, soTimeout, password, Protocol.DEFAULT_DATABASE, clientName); jedisReadPools.add(newJedisPool); // 新来的节点,可能是之前挂掉后恢复的节点,所以要检查其是否存在于失败列表中,要去除 failMap.remove(hostAndPort); log.info("Add JedisReadPool at " + hostAndPort); } } /** * 读池中去掉一个读节点 * @param hostAndPort */ protected synchronized void removeFromReadPool(HostAndPort hostAndPort) { for (JedisPoolExt jedisPoolExt : jedisReadPools) { if (jedisPoolExt.getHostAndPort().equals(hostAndPort)) { jedisReadPools.remove(jedisPoolExt); log.info("Remove JedisReadPool at " + hostAndPort); break; } } } /** * 替换读池 * @param newHaps */ protected synchronized void updateReadPools(List<HostAndPort> newHaps) { // 拿来新的读节点,在老池里补上 for (HostAndPort hap : newHaps) { insertReadPool(hap); } // 把老池里有,但不存在于新节点里的节点去掉 for (int i = 0; i < jedisReadPools.size(); i++) { boolean needRemove = true; for (HostAndPort hap : newHaps) { if (jedisReadPools.get(i).getHostAndPort().equals(hap)) { needRemove = false; break; } } if (needRemove) { jedisReadPools.remove(i); i--; } } } protected HostAndPort toHostAndPort(List<String> getMasterAddrByNameResult) { String host = getMasterAddrByNameResult.get(0); int port = Integer.parseInt(getMasterAddrByNameResult.get(1)); return new HostAndPort(host, port); } @Override public Jedis getResource() { while (true) { Jedis jedis = super.getResource(); jedis.setDataSource(this); // get a reference because it can change concurrently final HostAndPort master = currentHostMaster; final HostAndPort connection = new HostAndPort(jedis.getClient().getHost(), jedis.getClient() .getPort()); if (master.equals(connection)) { // connected to the correct master return jedis; } else { jedis.close(); } } } /** * @deprecated starting from Jedis 3.0 this method won't exist. Resouce cleanup should be done * using @see {@link Jedis#close()} */ public void returnBrokenResource(final Jedis resource) { if (resource != null) { returnBrokenResourceObject(resource); } } /** * @deprecated starting from Jedis 3.0 this method won't exist. Resouce cleanup should be done * using @see {@link Jedis#close()} */ public void returnResource(final Jedis resource) { if (resource != null) { resource.resetState(); returnResourceObject(resource); } } /** * master切换事件监听 */ protected class MasterListener extends Thread { protected String masterName; protected String host; protected int port; protected Jedis j; protected AtomicBoolean running = new AtomicBoolean(false); public MasterListener(String masterName, String host, int port) { this.masterName = masterName; this.host = host; this.port = port; } public void run() { running.set(true); while (running.get()) { j = new Jedis(host, port); try { j.subscribe(new JedisPubSub() { @Override public void onMessage(String channel, String message) { log.info("Sentinel " + host + ":" + port + " published: " + channel + " " + message + "."); String[] switchMasterMsg = message.split(" "); if (switchMasterMsg.length > 3) { if (masterName.equals(switchMasterMsg[0])) { initPool(toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4]))); if (masterWriteOnly) { // 如果master只做写,则将新的master从读池去掉 removeFromReadPool(toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4]))); } else { // 如果master同时允许读写,则将旧master从读池去掉 removeFromReadPool(toHostAndPort(Arrays.asList(switchMasterMsg[1], switchMasterMsg[2]))); } } else { log.fine("Ignoring message on +switch-master for master name " + switchMasterMsg[0] + ", our master name is " + masterName); } } else { log.severe("Invalid message received on Sentinel " + host + ":" + port + " on channel +switch-master: " + message); } } }, "+switch-master"); } catch (JedisConnectionException e) { runningSleep(running.get(), host, port, subscribeRetryWaitTimeMillis); } } } public void shutdown() { shutdownListener(j, running, host, port); } } /** * slave故障恢复&新增slave节点 事件监听 */ protected class SlaveUpListener extends Thread { protected String masterName; protected String host; protected int port; protected Jedis j; protected AtomicBoolean running = new AtomicBoolean(false); public SlaveUpListener(String masterName, String host, int port) { this.masterName = masterName; this.host = host; this.port = port; } public void run() { running.set(true); while (running.get()) { j = new Jedis(host, port); try { j.subscribe(new JedisPubSub() { @Override public void onMessage(String channel, String message) { log.info("Sentinel " + host + ":" + port + " published: " + channel + " " + message + "."); String[] switchMasterMsg = message.split(" "); if (switchMasterMsg.length > 7) { String slaveHost = switchMasterMsg[2]; int slavePort = Integer.valueOf(switchMasterMsg[3]); HostAndPort hap = new HostAndPort(slaveHost, slavePort); insertReadPool(hap); } else { log.severe("Invalid message received on Sentinel " + host + ":" + port + " on channel " + channel + ": " + message); } } }, "-sdown", "+slave"); } catch (JedisConnectionException e) { runningSleep(running.get(), host, port, subscribeRetryWaitTimeMillis); } } } public void shutdown() { shutdownListener(j, running, host, port); } } protected void shutdownListener(Jedis j, AtomicBoolean running, String host, int port) { try { log.fine("Shutting down listener on " + host + ":" + port); running.set(false); j.disconnect(); } catch (Exception e) { log.log(Level.SEVERE, "Caught exception while shutting down: ", e); } } protected void runningSleep(boolean running, String host, int port, long waitTimeMillis) { if (running) { log.severe("Lost connection to Sentinel at " + host + ":" + port + ". Sleeping " + waitTimeMillis + "ms and retrying."); try { Thread.sleep(waitTimeMillis); } catch (InterruptedException e1) { e1.printStackTrace(); } } else { log.fine("Unsubscribing from Sentinel at " + host + ":" + port); } } /** * slave挂掉事件监听 */ protected class SlaveDownListener extends Thread { protected String masterName; protected String host; protected int port; protected Jedis j; protected AtomicBoolean running = new AtomicBoolean(false); public SlaveDownListener(String masterName, String host, int port) { this.masterName = masterName; this.host = host; this.port = port; } public void run() { running.set(true); while (running.get()) { j = new Jedis(host, port); try { j.subscribe(new JedisPubSub() { @Override public void onMessage(String channel, String message) { log.info("Sentinel " + host + ":" + port + " published: " + channel + " " + message + "."); String[] switchMasterMsg = message.split(" "); if (switchMasterMsg.length > 7) { String slaveHost = switchMasterMsg[2]; int slavePort = Integer.valueOf(switchMasterMsg[3]); HostAndPort hap = new HostAndPort(slaveHost, slavePort); removeFromReadPool(hap); } else if (switchMasterMsg.length != 4) { // master +sdown length=4 log.severe("Invalid message received on Sentinel " + host + ":" + port + " on channel +sdown: " + message); } } }, "+sdown"); } catch (JedisConnectionException e) { runningSleep(running.get(), host, port, subscribeRetryWaitTimeMillis); } } } public void shutdown() { shutdownListener(j, running, host, port); } } }
代码包:地址
使用:
@Controller public class HomeController { @Autowired private RedisDataSource redisDataSource; private static int z=1; @RequestMapping(value = "/setKey", method = RequestMethod.GET) @ResponseBody public String setKey() { z++; String ss=String.valueOf(z); redisDataSource.getRedisWriteClient().set(ss, ss); return ss; } @RequestMapping(value = "/getKey", method = RequestMethod.GET) @ResponseBody public String getKey(HttpServletRequest request) { String ss=request.getParameter("key"); return redisDataSource.getRedisReadClient().get(ss); }
相关文章推荐
- Redis 主从复制、读写分离、高可用(七)-part 2
- Redis 主从复制、读写分离、高可用(七)-part 1
- windows下redis基础操作与主从复制 从而 数据备份和读写分离
- 【中间件】Redis 实战之主从复制、高可用、分布式
- 分布式架构高可用架构篇_08_MyCat在MySQL主从复制基础上实现读写分离
- MySQL高可用--主从复制与读写分离
- 高可用架构篇--MyCat在MySQL主从复制基础上实现读写分离
- windows下redis基础操作与主从复制 从而 数据备份和读写分离
- redis的主从复制+高可用简单部署
- Redis 高可用--主从复制
- Redis模拟主从复制,读写分离
- Redis 高可用:Redis Sentinel 主从复制故障转移
- Sentinel-Redis高可用方案(一):主从复制
- 高可用架构篇--MyCat在MySQL主从复制基础上实现读写分离
- Redis集群之主从复制,读写分离(上)(五)
- Redis 主从复制高可用方案
- Sentinel-Redis高可用方案(一):主从复制
- redis实现主从复制和高可用(主从切换)
- KeepAlived+Redis | 高可用 | 主从复制 | 健康检查 | 故障自动切换
- redis高可用主从复制