您的位置:首页 > 数据库 > Redis

redis 集群部署及分布式锁的实现

2018-03-16 16:38 591 查看

一、redis集群的部署

安装redis
确保安装文件夹有redis-trib.rb文件,通过rudy构建redis集群

安装ruby环境
配置好环境变量,gem install redis 安装redis依赖
详细环境安装教程:点击打开链接
集群搭建      redis集群最小包含3个主节点,并且每个节点都应该部署在不同的服务器上,这里测试建立3个主节点和三个从节点的redis集群,并部署在本地机器上,分别监听不同的端口(7000,7001,7100,7101,7200,7201),7000、7100、7200为三个主节点,7001、7101、7201分别为它们对应的从节点。
     在redis安装目录建立cluster文件夹,在这个目录下为这六个redis节点新建六个文件夹,名称为各自的端口名,再在建好的文件里创建如redis_7000.conf配置文件,内容为:port 7000
#绑定监听的IP地址,默认为127.0.0.1
#bind 172.16.10.49
appendonly yes
appendfilename "appendonly_7000.aof"

# // 如果要设置最大内存空间,可添加如下两句
maxmemory 200mb
maxmemory-policy allkeys-lru

cluster-enabled yes
cluster-config-file nodes_7000.conf
cluster-node-timeout 15000
cluster-slave-validity-factor 10
cluster-migration-barrier 1
cluster-require-full-coverage yes
端口号7000都要改成各自对应的端口号,IP地址这里测试就用默认127.0.0.1,建议使用电脑的本地ip地址。cluster-config-file参数指定了redis集群节点的配置文件名称,创建集群后生成,内容如下:
0adf165a965376913d6a4d426784371e6b0d7cf6 127.0.0.1:7101 slave 2ec1abbf3bcd7758281bcc7b6762532084c0b60c 0 1521124105758 5 connected
8d8f817d997b57ccb10049208738d5ebfdd5c2fe 127.0.0.1:7001 master - 0 1521124104759 7 connected 0-5460
9334416471942fd042f704ae8a345ab6c1adcd37 127.0.0.1:7201 slave b7fedf227aec4fc2911bdf7de2566b73e0a88045 0 1521124107358 9 connected
1030b137672c69175e7cf1d99be325329e016422 127.0.0.1:7000 myself,slave 8d8f817d997b57ccb10049208738d5ebfdd5c2fe 0 0 1 connected
b7fedf227aec4fc2911bdf7de2566b73e0a88045 127.0.0.1:7200 master - 0 1521124100754 9 connected 10923-16383
2ec1abbf3bcd7758281bcc7b6762532084c0b60c 127.0.0.1:7100 master - 0 1521124106758 2 connected 5461-10922
vars currentEpoch 9 lastVoteEpoch 0
如果配置集群后,想修改各节点的信息,如IP地址端口等,可以在这里修改,但必须每个节点下的都要改。

然后使用如下命令安装各服务,并启动
  安装   redis-server --service-install cluster/7100/redis_7100.conf --service-name redis7100   

  启动   redis-server --service-start --service-name redis7100                                               

删除服务 redis-server --service-uninstall --service-name redis7100
现在已经启动了六个redis服务,但六个redis并没有联系,没有实现集群。
创建集群:
ruby redis-trib.rb create --replicas 1 127.0.0.1:700 0 127.0.0.1:7100 127.0.0.1:7200 127.0.0.1:7001 127.0.0.1:7101 127.0.0.1:7201
使用redis-trib.rb命令创建集群,ruby可以去掉。命令运行成功,则集群就创建成功了。
连接集群:
redis-cli -c -p 7200 -h 127.0.0.1
这里连接任何一个节点都可以。

redis集群工作

set keya valuea
1、先对keya 计算值
2、对16384(总槽点数)取余(得到槽点)
3、通过槽点找到对应的节点
4、在这个节点执行set keya valuea

节点,分为主节点和从节点,一个主节点下可以有多个从节点。槽点值分配在主节点上,redis中的key-value就是
存储在槽点上。
当主节点挂掉好,选举一个从节点成为主节点,若该主节点没有从节点,则集群处于fail状态,或有半数以上的主
节点挂掉,集群也处于fail状态
主节点会把key—value写入从节点

二、redis实现分布式锁

redis分布式的实现原理:
    1、通过setNX操作,如果存在key,不操作;不存在,才会set值,保证锁的互斥性
    2、value设置锁的到期时间,当锁超时时,进行getAndSet操作,先get旧值,再set新值,避免发生死锁。这里也可以通过设置key的有效期来避免死锁,但是setNx和exprise(设置有效期)操作非原子性,可能发生锁没有设置有效时间的问题,从而发生死锁。

实现:

spring boot 通过jdeis连接redsi集群
redis配置文件:default.redis.maxRedirects=3
#连接池中最大连接数。高版本:maxTotal,低版本:maxActive
default.redis.maxTotal=20
#连接池中最大空闲的连接数
default.redis.maxIdle=10
#连接池中最少空闲的连接数
default.redis.minIdle=1
#当连接池资源耗尽时,调用者最大阻塞的时间,超时将跑出异常。单位,毫秒数;默认为-1.表示永不超时。高版本:maxWaitMillis,低版本:maxWait
default.redis.maxWaitMillis=3000
#连接空闲的最小时间,达到此值后空闲连接将可能会被移除。负值(-1)表示不移除
default.redis.minEvictableIdleTimeMillis=-1
#对于“空闲链接”检测线程而言,每次检测的链接资源的个数。默认为3
default.redis.numTestsPerEvictionRun=3
#“空闲链接”检测线程,检测的周期,毫秒数。如果为负值,表示不运行“检测线程”。默认为-1
default.redis.timeBetweenEvictionRunsMillis=-1
#向调用者输出“链接”资源时,是否检测是有有效,如果无效则从连接池中移除,并尝试获取继续获取。默认为false。建议保持默认值
default.redis.testOnBorrow=false
#连超时设置
default.redis.timeout=15000
#是否使用连接池
default.redis.usePool=true
#host&port
# 建议使用实际ip地址创建集群
default.redis.nodes[0]=127.0.0.1:7000
default.redis.nodes[1]=127.0.0.1:7001
default.redis.nodes[2]=127.0.0.1:7100
default.redis.nodes[3]=127.0.0.1:7101
default.redis.nodes[4]=127.0.0.1:7200
default.redis.nodes[5]=127.0.0.1:7201
通过@ConfigurationProperties注解读取配置信息:@Component
@ConfigurationProperties(prefix = "default.redis")
public class RedisProperties {
private int maxRedirects;
private int maxTotal;
private int maxIdle;
private int minIdle;
private int maxWaitMillis;
private int minEvictableIdleTimeMillis;
private int numTestsPerEvictionRun;
private int timeBetweenEvictionRunsMillis;
private boolean testOnBorrow;
private int timeout;
private boolean usePool;
private List<String> nodes;

public int getMaxRedirects() {
return maxRedirects;
}

public void setMaxRedirects(int maxRedirects) {
this.maxRedirects = maxRedirects;
}

public int getMaxTotal() {
return maxTotal;
}

public void setMaxTotal(int maxTotal) {
this.maxTotal = maxTotal;
}

public int getMaxIdle() {
return maxIdle;
}

public void setMaxIdle(int maxIdle) {
this.maxIdle = maxIdle;
}

public int getMinIdle() {
return minIdle;
}

public void setMinIdle(int minIdle) {
this.minIdle = minIdle;
}

public int getMaxWaitMillis() {
return maxWaitMillis;
}

public void setMaxWaitMillis(int maxWaitMillis) {
this.maxWaitMillis = maxWaitMillis;
}

public int getMinEvictableIdleTimeMillis() {
return minEvictableIdleTimeMillis;
}

public void setMinEvictableIdleTimeMillis(int minEvictableIdleTimeMillis) {
this.minEvictableIdleTimeMillis = minEvictableIdleTimeMillis;
}

public int getNumTestsPerEvictionRun() {
return numTestsPerEvictionRun;
}

public void setNumTestsPerEvictionRun(int numTestsPerEvictionRun) {
this.numTestsPerEvictionRun = numTestsPerEvictionRun;
}

public int getTimeBetweenEvictionRunsMillis() {
return timeBetweenEvictionRunsMillis;
}

public void setTimeBetweenEvictionRunsMillis(int timeBetweenEvictionRunsMillis) {
this.timeBetweenEvictionRunsMillis = timeBetweenEvictionRunsMillis;
}

public boolean isTestOnBorrow() {
return testOnBorrow;
}

public void setTestOnBorrow(boolean testOnBorrow) {
this.testOnBorrow = testOnBorrow;
}

public int getTimeout() {
return timeout;
}

public void setTimeout(int timeout) {
this.timeout = timeout;
}

public boolean isUsePool() {
return usePool;
}

public void setUsePool(boolean usePool) {
this.usePool = usePool;
}

public List<String> getNodes() {
return nodes;
}

public void setNodes(List<String> nodes) {
this.nodes = nodes;
}

}
redis配置类,生成redisTemplate@Configuration
public class RedisConfig {

@Autowired
private RedisProperties redisProperties;

@Bean
public RedisClusterConfiguration redisClusterConfiguration() {
RedisClusterConfiguration red
e7af
isClusterConfiguration = new RedisClusterConfiguration();
redisClusterConfiguration.setClusterNodes(getRedisNode());
redisClusterConfiguration.setMaxRedirects(redisProperties.getMaxRedirects());
return redisClusterConfiguration;
}

@Bean
public JedisPoolConfig jedisPoolConfig() {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxIdle(redisProperties.getMaxIdle());
jedisPoolConfig.setMaxTotal(redisProperties.getMaxTotal());
jedisPoolConfig.setMinIdle(redisProperties.getMinIdle());
jedisPoolConfig.setMaxWaitMillis(redisProperties.getMaxWaitMillis());
jedisPoolConfig.setNumTestsPerEvictionRun(redisProperties.getNumTestsPerEvictionRun());
jedisPoolConfig.setTimeBetweenEvictionRunsMillis(redisProperties.getTimeBetweenEvictionRunsMillis());
jedisPoolConfig.setTestOnBorrow(redisProperties.isTestOnBorrow());
return jedisPoolConfig;
}

@Bean
public JedisConnectionFactory jedisConnectionFactory(RedisClusterConfiguration redisClusterConfiguration, JedisPoolConfig jedisPoolConfig) {
JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(redisClusterConfiguration, jedisPoolConfig);
jedisConnectionFactory.setTimeout(redisProperties.getTimeout());
return jedisConnectionFactory;
}

@Bean
public RedisTemplate redisTemplate(JedisConnectionFactory jedisConnectionFactory) {
RedisTemplate redisTemplate = new RedisTemplate();
redisTemplate.setConnectionFactory(jedisConnectionFactory);
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
return redisTemplate;
}

private List<RedisNode> getRedisNode() {
List<String> nodes = redisProperties.getNodes();
if (CommonUtils.isNotEmpty(nodes)) {
List<RedisNode> redisNodes = nodes.stream().map(node -> {
String[] ss = node.split(":");
return new RedisNode(ss[0], Integer.valueOf(ss[1]));
}).collect(Collectors.toList());
return redisNodes;
}
return new ArrayList<>();
}
}
redis锁的实现:@Component
public class RedisLock {

private static final Logger log = LoggerFactory.getLogger(RedisLock.class);
/* 默认锁的有效时间30s */
private static final int DEFAULT_LOCK_EXPIRSE_MILL_SECONDS = 30 * 1000;
/* 默认请求锁等待超时时间10s */
private static final int DEFAULT_LOCK_WAIT_DEFAULT_TIME_OUT = 10 * 1000;
/* 默认的轮询获取锁的间隔时间 */
private static final int DEFAULT_LOOP_WAIT_TIME = 150;
/* 锁的key前缀 */
private static final String LOCK_PREFIX = "LOCK_";

/* 是否获得锁的标志 */
private boolean lock = false;
/* 锁的key */
private String lockKey;
/* 锁的有效时间(ms) */
private int lockExpirseTimeout;
/* 请求锁的阻塞时间(ms) */
private int lockWaitTimeout;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

public boolean isLock() {
return lock;
}

public void setLock(boolean lock) {
this.lock = lock;
}

public String getLockKey() {
return lockKey;
}

public void setLockKey(String lockKey) {
this.lockKey = LOCK_PREFIX + lockKey;
}

public int getLockExpirseTimeout() {
return lockExpirseTimeout;
}

public void setLockExpirseTimeout(int lockExpirseTimeout) {
this.lockExpirseTimeout = lockExpirseTimeout;
}

public int getLockWaitTimeout() {
return lockWaitTimeout;
}

public void setLockWaitTimeout(int lockWaitTimeout) {
this.lockWaitTimeout = lockWaitTimeout;
}

public void setRedisTemplate(RedisTemplate<String, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}

public RedisLock() {
}

public RedisLock(String lockKey, int lockExpirseTimeout, int lockWaitTimeout) {
this.lockKey = LOCK_PREFIX + lockKey;
this.lockExpirseTimeout = lockExpirseTimeout;
this.lockWaitTimeout = lockWaitTimeout;
}

public RedisLock newInstance(String lockKey) {
RedisLock redisLock = new RedisLock(lockKey, DEFAULT_LOCK_EXPIRSE_MILL_SECONDS, DEFAULT_LOCK_WAIT_DEFAULT_TIME_OUT);
redisLock.setRedisTemplate(this.redisTemplate);
return redisLock;
}

public RedisLock newInstance(String lockKey, int lockExpirseTimeout, int lockWaitTimeout) {
if (lockExpirseTimeout == 0 || lockWaitTimeout == 0) {
lockExpirseTimeout = DEFAULT_LOCK_EXPIRSE_MILL_SECONDS;
lockWaitTimeout = DEFAULT_LOCK_WAIT_DEFAULT_TIME_OUT;
}
RedisLock redisLock = new RedisLock(lockKey, lockExpirseTimeout, lockWaitTimeout);
redisLock.setRedisTemplate(this.redisTemplate);
return redisLock;
}

public boolean setIfAbsent(String expirseTimeStr) {
// setIfAbsent通过jedis的setNx实现
return this.redisTemplate.opsForValue().setIfAbsent(this.lockKey, expirseTimeStr);
}

public String getAndSet(String expiresTimeStr) {
// 获取原来的值,并设置新的值,原子操作
return (String) this.redisTemplate.opsForValue().getAndSet(this.lockKey, expiresTimeStr);
}

/**
* 1、获得当前系统时间,计算锁的到期时间
* 2、setNx操作,加锁
* 3、如果,加锁成功,设置锁的到期时间,返回true;取锁失败,取出当前锁的value(到期时间)
* 4、如果value不为空而且小于当前系统时间,进行getAndSet操作,重新设置value,并取出旧value;否则,等待间隔时间后,重复步骤2;
* 5、如果步骤3和4取出的value一样,加锁成功,设置锁的到期时间,返回true;否则,别人加锁成功,恢复锁的value,等待间隔时间后,重复步骤2。
*/
public boolean lock() {
log.info("{}-----尝试获取锁...", Thread.currentThread().getName());
int lockWaitMillSeconds = this.lockWaitTimeout;
// key 的值,表示key的到期时间
String redisValue = String.valueOf(System.currentTimeMillis() + this.lockExpirseTimeout);
while (lockWaitMillSeconds > 0) {
lock = setIfAbsent(redisValue);
if (lock) {
// 拿到锁,设置锁的有效期,这里可能因为故障没有被执行,锁会一直存在,这时就需要value的有效期去判断锁是否失效
this.redisTemplate.expire(this.lockKey, lockExpirseTimeout, TimeUnit.MILLISECONDS);
log.info("{}-----获得锁", Thread.currentThread().getName());
return lock;
} else {
// 锁存在,判断锁有没有过期
String oldValue = (String) this.redisTemplate.opsForValue().get(this.lockKey);
if (CommonUtils.isNotEmpty(oldValue) && Long.parseLong(oldValue) < System.currentTimeMillis()) {
// 锁的到期时间小于当前时间,说明锁已失效, 修改value,获得锁
String currentRedisValue = getAndSet(String.valueOf(lockExpirseTimeout + System.currentTimeMillis()));
// 如果两个值不相等,说明有另外一个线程拿到了锁,阻塞
if (currentRedisValue.equals(oldValue)) {
// 如果修改的锁的有效期之前没被其他线程修改,则获得锁, 设置锁的超时时间
redisTemplate.expire(this.lockKey, lockExpirseTimeout, TimeUnit.MILLISECONDS);
log.info("{}-----获得锁", Thread.currentThread().getName());
this.lock = true;
return this.lock;
} else {
// 有另外一个线程获得了这个超时的锁,不修改锁的value
redisTemplate.opsForValue().set(this.lockKey, currentRedisValue);
}
}
}
// 减掉固定轮询获取锁的间隔时间
lockWaitMillSeconds -= DEFAULT_LOOP_WAIT_TIME;
try {
log.info("{}-----等待{}ms后,再尝试获取锁...", Thread.currentThread().getName(), DEFAULT_LOOP_WAIT_TIME);
// 取锁失败时,应该在随机延时后进行重试,避免不同客户端同时重试导致谁都无法拿到锁的情况出现,也可以采用等待队列的方式
Thread.sleep(DEFAULT_LOOP_WAIT_TIME);
} catch (InterruptedException e) {
log.error("redis 同步锁出现未知异常", e);
}
}
log.info("{}-----请求锁超时,获得锁失败", Thread.currentThread().getName());
return false;
}

public void unlock() {
if (lock) {
this.redisTemplate.delete(this.lockKey);
this.lock = false;
}
}

}
加锁过程:
1、获得当前系统时间,计算锁的到期时间
2、setNx操作,加锁
3、如果,加锁成功,设置锁的到期时间,返回true;取锁失败,取出当前锁的value(到期时间)
4、如果value不为空而且小于当前系统时间,进行getAndSet操作,重新设置value,并取出旧value;否则,等待间隔时间后,重复步骤2;
5、如果步骤3和4取出的value一样,加锁成功,设置锁的到期时间,返回true;否则,别人加锁成功,恢复锁的value,等待间隔时间后,重复步骤2。

这里设置锁的到期时间,只是为了减少后面复杂逻辑的执行。

测试:

测试类
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = Application.class)
public class TestRedis {
private CountDownLatch countDownLatch = new CountDownLatch(2);

@Test
public void testRedisLock() throws InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
RedisLock lock = ((RedisLock) SpringContextUtil.getBean("redisLock")).newInstance("test");
if (lock.lock()) {
System.out.println("work1获得锁");
System.out.println("work1 工作15s...");
try {
Thread.sleep(15000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("work1完成工作,释放锁");
lock.unlock();
}
countDownLatch.countDown();
}
},"work1").start();
new Thread(new Runnable() {
@Override
public void run() {
RedisLock lock = ((RedisLock) SpringContextUtil.getBean("redisLock")).newInstance("test");
if (lock.lock()) {
System.out.println("work2获得锁");
System.out.println("work2 工作5s...");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("work2完成工作,释放锁");
lock.unlock();
}
countDownLatch.countDown();
}
}, "work2").start();
// 等待两个线程完成,才完成主线程
countDownLatch.await();
}
}

这里在单元测试中起两个线程work1和work2,work1模拟工作15s,work2模拟工作5s。这里为了juint的主线程不会在两个work线程完成工作之前就停止,用到了CountDownLatch,让主线程在两个work线程完成前等待。
上面测试有两种结果,第一种,work2先拿到锁,工作5s,work1等待(默认等待超时时间10s),等待过程中一值尝试获取锁(默认间隔150ms),默认锁的有效期30s,显然,5s后work2完成工作释放锁,work1获得锁,work1和work2都正常完成了工作。结果如下:2018-03-16 14:16:50,579 5576 [work1] INFO c.s.component.redis.lock.RedisLock - work1-----尝试获取锁...
2018-03-16 14:16:50,579 5576 [work2] INFO c.s.component.redis.lock.RedisLock - work2-----尝试获取锁...
2018-03-16 14:16:50,598 5595 [work1] INFO c.s.component.redis.lock.RedisLock - work1-----等待150ms后,再尝试获取锁...
2018-03-16 14:16:50,599 5596 [work2] INFO c.s.component.redis.lock.RedisLock - work2-----获得锁
work2获得锁
work2 工作5s...
2018-03-16 14:16:50,748 5745 [work1] INFO c.s.component.redis.lock.RedisLock - work1-----等待150ms后,再尝试获取锁...
。。。省略重复尝试获取锁日志。。。
2018-03-16 14:16:55,571 10568 [work1] INFO c.s.component.redis.lock.RedisLock - work1-----等待150ms后,再尝试获取锁...
work2完成工作,释放锁
2018-03-16 14:16:55,722 10719 [work1] INFO c.s.component.redis.lock.RedisLock - work1-----获得锁
work1获得锁
work1 工作15s...
work1完成工作,释放锁
第二种结果,work1先拿到锁,工作15s,work2等待,这里,因为work1的工作时间超过了默认的等待超时时间10s,所以work2在work1完成工作释放锁之前就因为等待超时而获取锁失败,不能完成工作,结果如下:2018-03-16 14:22:45,292 5448 [work1] INFO c.s.component.redis.lock.RedisLock - work1-----尝试获取锁...
2018-03-16 14:22:45,292 5448 [work2] INFO c.s.component.redis.lock.RedisLock - work2-----尝试获取锁...
2018-03-16 14:22:45,308 5464 [work2] INFO c.s.component.redis.lock.RedisLock - work2-----等待150ms后,再尝试获取锁...
2018-03-16 14:22:45,309 5465 [work1] INFO c.s.component.redis.lock.RedisLock - work1-----获得锁
work1获得锁
work1 工作15s...
2018-03-16 14:22:45,459 5615 [work2] INFO c.s.component.redis.lock.RedisLock - work2-----等待150ms后,再尝试获取锁...
。。。省略重复尝试获取锁日志。。。
2018-03-16 14:22:55,251 15407 [work2] INFO c.s.component.redis.lock.RedisLock - work2-----等待150ms后,再尝试获取锁...
2018-03-16 14:22:55,401 15557 [work2] INFO c.s.component.redis.lock.RedisLock - work2-----请求锁超时,获得锁失败
。。。这里还要等待大概5s。。。
work1完成工作,释放锁
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  redis 集群 分布式锁
相关文章推荐