死磕 java同步系列之redis分布式鎖進化史

問題

(1)redis如何實現分布式鎖?

(2)redis分布式鎖有哪些優點?

(3)redis分布式鎖有哪些缺點?

(4)redis實現分布式鎖有沒有現成的輪子可以使用?

簡介

Redis(全稱:Remote Dictionary Server 遠程字典服務)是一個開源的使用ANSI C語言編寫、支持網絡、可基于內存亦可持久化的日志型、Key-Value數據庫,并提供多種語言的API。

本章我們將介紹如何基于redis實現分布式鎖,并把其實現的進化史從頭到尾講明白,以便大家在面試的時候能講清楚redis分布式鎖的來(忽)龍(悠)去(考)脈(官)。

實現鎖的條件

基于前面關于鎖(分布式鎖)的學習,我們知道實現鎖的條件有三個:

(1)狀態(共享)變量,它是有狀態的,這個狀態的值標識了是否已經被加鎖,在ReentrantLock中是通過控制state的值實現的,在ZookeeperLock中是通過控制子節點來實現的;

(2)隊列,它是用來存放排隊的線程,在ReentrantLock中是通過AQS的隊列實現的,在ZookeeperLock中是通過子節點的有序性實現的;

(3)喚醒,上一個線程釋放鎖之后喚醒下一個等待的線程,在ReentrantLock中結合AQS的隊列釋放時自動喚醒下一個線程,在ZookeeperLock中是通過其監聽機制來實現的;

那么上面三個條件是不是必要的呢?

其實不然,實現鎖的必要條件只有第一個,對共享變量的控制,如果共享變量的值為null就給他設置個值(java中可以使用CAS操作進程內共享變量),如果共享變量有值則不斷重復檢查其是否有值(重試),待鎖內邏輯執行完畢再把共享變量的值設置回null。

說白了,只要有個地方存這個共享變量就行了,而且要保證整個系統(多個進程)內只有這一份即可。

這也是redis實現分布式鎖的關鍵【本篇文章由公眾號“彤哥讀源碼”原創】。

redis分布式鎖進化史

進化史一——set

既然上面說了實現分布式鎖只需要對共享變量控制到位即可,那么redis我們怎么控制這個共享變量呢?

首先,我們知道redis的基礎命令有get/set/del,通過這三個命令可以實現分布式鎖嗎?當然可以。

redis

在獲取鎖之前先get lock_user_1看這個鎖存不存在,如果不存在則再set lock_user_1 value,如果存在則等待一段時間后再重試,最后使用完成了再刪除這個鎖del lock_user_1即可。

redis

但是,這種方案有個問題,如果一開始這個鎖是不存在的,兩個線程去同時get,這個時候返回的都是null(nil),然后這兩個線程都去set,這時候就出問題了,兩個線程都可以set成功,相當于兩個線程都獲取到同一個鎖了。

所以,這種方案不可行!

進化史二——setnx

上面的方案不可行的主要原因是多個線程同時set都是可以成功的,所以后來有了setnx這個命令,它是set if not exist的縮寫,也就是如果不存在就set。

redis

可以看到,當重復對同一個key進行setnx的時候,只有第一次是可以成功的。

因此,方案二就是先使用setnx lock_user_1 value命令,如果返回1則表示加鎖成功,如果返回0則表示其它線程先執行成功了,那就等待一段時間后重試,最后一樣使用del lock_user_1釋放鎖。

redis

但是,這種方案也有個問題,如果獲取鎖的這個客戶端斷線了怎么辦?這個鎖不是一直都不會釋放嗎?是的,是這樣的。

所以,這種方案也不可行!

進化史三——setnx + setex

上面的方案不可行的主要原因是獲取鎖之后客戶端斷線了無法釋放鎖的問題,那么,我在setnx之后立馬再執行setex可以嗎?

答案是可以的,2.6.12之前的版本使用redis實現分布式鎖大家都是這么玩的。

redis

因此,方案三就是先使用setnx lock_user_1 value命令拿到鎖,再立即使用setex lock_user_1 30 value設置過期時間,最后使用del lock_user_1釋放鎖。

在setnx獲取到鎖之后再執行setex設置過期時間,這樣就很大概率地解決了獲取鎖之后客戶端斷線不會釋放鎖的問題。

但是,這種方案依然有問題,如果setnx之后setex之前這個客戶端就斷線了呢?嗯~,似乎無解,不過這種概率實在是非常小,所以2.6.12之前的版本大家也都這么用,幾乎沒出現過什么問題。

所以,這種方案基本可用,只是不太好!

進化史四——set nx ex

上面的方案不太好的主要原因是setnx/setex是兩條獨立的命令,無法解決前者成功之后客戶端斷線的問題,那么,把兩條命令合在一起不就行了嗎?

是的,redis官方也意識到這個問題了,所以2.6.12版本給set命令加了一些參數:

SET key value [EX seconds] [PX milliseconds] [NX|XX]

EX,過期時間,單位秒

PX,過期時間,單位毫秒

NX,not exist,如果不存在才設置成功

XX,exist exist?如果存在才設置成功

通過這個命令我們就再也不怕客戶端無故斷線了【本篇文章由公眾號“彤哥讀源碼”原創】。

redis

因此,方案四就是先使用set lock_user_1 value nx ex 30獲取鎖,獲取鎖之后使用,使用完成了最后del lock_user_1釋放鎖。

然而,這種方案就沒有問題嗎?

當然有問題,其實這里的釋放鎖只要簡單地執行del lock_user_1即可,并不會檢查這個鎖是不是當前客戶端獲取到的。

所以,這種方案還不是很完美。

進化史五——random value + lua script

上面的方案不完美的主要原因是釋放鎖這里控制的還不是很到位,那么有沒有其它方法可以控制釋放鎖的線程和加鎖的線程一定是同一個客戶端呢?

redis官方給出的方案是這樣的:

 // 加鎖
 SET resource_name my_random_value NX PX 30000
 
 // 釋放鎖
 if redis.call("get",KEYS[1]) == ARGV[1] then
     return redis.call("del",KEYS[1])
 else
     return 0
 end

加鎖的時候,設置隨機值,保證這個隨機值只有當前客戶端自己知道。

釋放鎖的時候,執行一段lua腳本,把這段lua腳本當成一個完整的命令,先檢查這個鎖對應的值是不是上面設置的隨機值,如果是再執行del釋放鎖,否則直接返回釋放鎖失敗。

我們知道,redis是單線程的,所以這段lua腳本中的get和del不會存在并發問題,但是不能在java中先get再del,這樣會當成兩個命令,會有并發問題,lua腳本相當于是一個命令一起傳輸給redis的。

這種方案算是比較完美了,但是還有一點小缺陷,就是這個過期時間設置成多少合適呢?

設置的過小,有可能上一個線程還沒執行完鎖內邏輯,鎖就自動釋放了,導致另一個線程可以獲取鎖了,就出現并發問題了;

設置的過大,就要考慮客戶端斷線了,這個鎖要等待很長一段時間。

所以,這里又衍生出一個新的問題,過期時間我設置小一點,但是快到期了它能自動續期就好了。

進化史六——redisson(redis2.8+)

上面方案的缺陷是過期時間不好把握,雖然也可以自己啟一個監聽線程來處理續期,但是代碼實在不太好寫,好在現成的輪子redisson已經幫我們把這個邏輯都實現好了,我們拿過來直接用就可以了。

而且,redisson充分考慮了redis演化過程中留下的各種問題,單機模式、哨兵模式、集群模式,它統統都處理好了,不管是從單機進化到集群還是從哨兵進化到集群,都只需要簡單地修改下配置就可以了,不用改動任何代碼,可以說是非(業)常(界)方(良)便(心)。

redisson實現的分布式鎖內部使用的是Redlock算法,這是官方推薦的一種算法。

另外,redisson還提供了很多分布式對象(分布式的原子類)、分布式集合(分布式的Map/List/Set/Queue等)、分布式同步器(分布式的CountDownLatch/Semaphore等)、分布式鎖(分布式的公平鎖/非公平鎖/讀寫鎖等),有興趣的可以去看看,下面貼出鏈接:

redis

Redlock介紹:https://redis.io/topics/distlock

redisson介紹:https://github.com/redisson/redisson/wiki

代碼實現

因為前面五種方案都已經過時,所以彤哥這里偷個懶,就不去一一實現的,我們直接看最后一種redisson的實現方式。

pom.xml文件

添加spring redis及redisson的依賴,我這里使用的是springboot 2.1.6版本,springboot 1.x版本的自己注意下,查看上面的github可以找到方法。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-data-21</artifactId>
    <version>3.11.0</version>
</dependency>
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.11.0</version>
</dependency>

application.yml文件

配置redis的連接信息,彤哥這里給出了三種方式。

spring:
  redis:
    # 單機模式
    #host: 192.168.1.102
    #port: 6379
    # password: <your passowrd>
    timeout: 6000ms  # 連接超時時長(毫秒)
    # 哨兵模式 【本篇文章由公眾號“彤哥讀源碼”原創】
#    sentinel:
#      master: <your master>
#      nodes: 192.168.1.101:6379,192.168.1.102:6379,192.168.1.103:6379
    # 集群模式(三主三從偽集群)
    cluster:
      nodes:
        - 192.168.1.102:30001
        - 192.168.1.102:30002
        - 192.168.1.102:30003
        - 192.168.1.102:30004
        - 192.168.1.102:30005
        - 192.168.1.102:30006

Locker接口

定義Locker接口。

public interface Locker {
    void lock(String key, Runnable command);
}

RedisLocker實現類

直接使用RedissonClient獲取鎖,注意這里不需要再單獨配置RedissonClient這個bean,redisson框架會根據配置自動生成RedissonClient的實例,我們后面說它是怎么實現的。

@Component
public class RedisLocker implements Locker {

    @Autowired
    private RedissonClient redissonClient;

    @Override
    public void lock(String key, Runnable command) {
        RLock lock = redissonClient.getLock(key);
        try {
            // 【本篇文章由公眾號“彤哥讀源碼”原創】
            lock.lock();
            command.run();
        } finally {
            lock.unlock();
        }
    }
}

測試類

啟動1000個線程,每個線程內部打印一句話,然后睡眠1秒。


@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class RedisLockerTest {

    @Autowired
    private Locker locker;

    @Test
    public void testRedisLocker() throws IOException {
        for (int i = 0; i < 1000; i++) {
            new Thread(()->{
                locker.lock("lock", ()-> {
                    // 可重入鎖測試
                    locker.lock("lock", ()-> {
                        System.out.println(String.format("time: %d, threadName: %s", System.currentTimeMillis(), Thread.currentThread().getName()));
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    });
                });
            }, "Thread-"+i).start();
        }

        System.in.read();
    }
}

運行結果:

可以看到穩定在1000ms左右打印一句話,說明這個鎖是可用的,而且是可重入的。

time: 1570100167046, threadName: Thread-756
time: 1570100168067, threadName: Thread-670
time: 1570100169080, threadName: Thread-949
time: 1570100170093, threadName: Thread-721
time: 1570100171106, threadName: Thread-937
time: 1570100172124, threadName: Thread-796
time: 1570100173134, threadName: Thread-944
time: 1570100174142, threadName: Thread-974
time: 1570100175167, threadName: Thread-462
time: 1570100176180, threadName: Thread-407
time: 1570100177194, threadName: Thread-983
time: 1570100178206, threadName: Thread-982
...

RedissonAutoConfiguration

剛才說RedissonClient不需要配置,其實它是在RedissonAutoConfiguration中自動配置的,我們簡單看下它的源碼,主要看redisson()這個方法:


@Configuration
@ConditionalOnClass({Redisson.class, RedisOperations.class})
@AutoConfigureBefore(RedisAutoConfiguration.class)
@EnableConfigurationProperties({RedissonProperties.class, RedisProperties.class})
public class RedissonAutoConfiguration {

    @Autowired
    private RedissonProperties redissonProperties;
    
    @Autowired
    private RedisProperties redisProperties;
    
    @Autowired
    private ApplicationContext ctx;
    
    @Bean
    @ConditionalOnMissingBean(name = "redisTemplate")
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<Object, Object> template = new RedisTemplate<Object, Object>();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }

    @Bean
    @ConditionalOnMissingBean(StringRedisTemplate.class)
    public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
        StringRedisTemplate template = new StringRedisTemplate();
        template.setConnectionFactory(redisConnectionFactory);
        return template;
    }

    @Bean
    @ConditionalOnMissingBean(RedisConnectionFactory.class)
    public RedissonConnectionFactory redissonConnectionFactory(RedissonClient redisson) {
        return new RedissonConnectionFactory(redisson);
    }
    
    @Bean(destroyMethod = "shutdown")
    @ConditionalOnMissingBean(RedissonClient.class)
    public RedissonClient redisson() throws IOException {
        Config config = null;
        Method clusterMethod = ReflectionUtils.findMethod(RedisProperties.class, "getCluster");
        Method timeoutMethod = ReflectionUtils.findMethod(RedisProperties.class, "getTimeout");
        Object timeoutValue = ReflectionUtils.invokeMethod(timeoutMethod, redisProperties);
        int timeout;
        if(null == timeoutValue){
            // 超時未設置則為0
            timeout = 0;
        }else if (!(timeoutValue instanceof Integer)) {
            // 轉毫秒
            Method millisMethod = ReflectionUtils.findMethod(timeoutValue.getClass(), "toMillis");
            timeout = ((Long) ReflectionUtils.invokeMethod(millisMethod, timeoutValue)).intValue();
        } else {
            timeout = (Integer)timeoutValue;
        }
        
        // 看下是否給redisson單獨寫了一個配置文件
        if (redissonProperties.getConfig() != null) {
            try {
                InputStream is = getConfigStream();
                config = Config.fromJSON(is);
            } catch (IOException e) {
                // trying next format
                try {
                    InputStream is = getConfigStream();
                    config = Config.fromYAML(is);
                } catch (IOException e1) {
                    throw new IllegalArgumentException("Can't parse config", e1);
                }
            }
        } else if (redisProperties.getSentinel() != null) {
            // 如果是哨兵模式
            Method nodesMethod = ReflectionUtils.findMethod(Sentinel.class, "getNodes");
            Object nodesValue = ReflectionUtils.invokeMethod(nodesMethod, redisProperties.getSentinel());
            
            String[] nodes;
            // 看sentinel.nodes這個節點是列表配置還是逗號隔開的配置
            if (nodesValue instanceof String) {
                nodes = convert(Arrays.asList(((String)nodesValue).split(",")));
            } else {
                nodes = convert((List<String>)nodesValue);
            }
            
            // 生成哨兵模式的配置
            config = new Config();
            config.useSentinelServers()
                .setMasterName(redisProperties.getSentinel().getMaster())
                .addSentinelAddress(nodes)
                .setDatabase(redisProperties.getDatabase())
                .setConnectTimeout(timeout)
                .setPassword(redisProperties.getPassword());
        } else if (clusterMethod != null && ReflectionUtils.invokeMethod(clusterMethod, redisProperties) != null) {
            // 如果是集群模式
            Object clusterObject = ReflectionUtils.invokeMethod(clusterMethod, redisProperties);
            Method nodesMethod = ReflectionUtils.findMethod(clusterObject.getClass(), "getNodes");
            // 集群模式的cluster.nodes是列表配置
            List<String> nodesObject = (List) ReflectionUtils.invokeMethod(nodesMethod, clusterObject);
            
            String[] nodes = convert(nodesObject);
            
            // 生成集群模式的配置
            config = new Config();
            config.useClusterServers()
                .addNodeAddress(nodes)
                .setConnectTimeout(timeout)
                .setPassword(redisProperties.getPassword());
        } else {
            // 單機模式的配置
            config = new Config();
            String prefix = "redis://";
            Method method = ReflectionUtils.findMethod(RedisProperties.class, "isSsl");
            // 判斷是否走ssl
            if (method != null && (Boolean)ReflectionUtils.invokeMethod(method, redisProperties)) {
                prefix = "rediss://";
            }
            
            // 生成單機模式的配置
            config.useSingleServer()
                .setAddress(prefix + redisProperties.getHost() + ":" + redisProperties.getPort())
                .setConnectTimeout(timeout)
                .setDatabase(redisProperties.getDatabase())
                .setPassword(redisProperties.getPassword());
        }
        
        return Redisson.create(config);
    }

    private String[] convert(List<String> nodesObject) {
        // 將哨兵或集群模式的nodes轉換成標準配置
        List<String> nodes = new ArrayList<String>(nodesObject.size());
        for (String node : nodesObject) {
            if (!node.startsWith("redis://") && !node.startsWith("rediss://")) {
                nodes.add("redis://" + node);
            } else {
                nodes.add(node);
            }
        }
        return nodes.toArray(new String[nodes.size()]);
    }

    private InputStream getConfigStream() throws IOException {
        // 讀取redisson配置文件
        Resource resource = ctx.getResource(redissonProperties.getConfig());
        InputStream is = resource.getInputStream();
        return is;
    }

    
}

網上查到的資料中很多配置都是多余的(可能是版本問題),看下源碼很清楚,這也是看源碼的一個好處。

總結

(1)redis由于歷史原因導致有三種模式:單機、哨兵、集群;

(2)redis實現分布式鎖的進化史:set -> setnx -> setnx + setex -> set nx ex(或px) -> set nx ex(或px) + lua script -> redisson;

(3)redis分布式鎖有現成的輪子redisson可以使用;

(4)redisson還提供了很多有用的組件,比如分布式集合、分布式同步器、分布式對象;

彩蛋

redis分布式鎖有哪些優點?

答:1)大部分系統都依賴于redis做緩存,不需要額外依賴其它組件(相對于zookeeper來說);

2)redis可以集群部署,相對于mysql的單點更可靠;

3)不會占用mysql的連接數,不會增加mysql的壓力;

4)redis社區相對活躍,redisson的實現更是穩定可靠;

5)利用過期機制解決客戶端斷線的問題,雖然不太及時;

6)有現成的輪子redisson可以使用,鎖的種類比較齊全;

redis分布式鎖有哪些缺點?

答:1)集群模式下會在所有master節點執行加鎖命令,大部分(2N+1)成功了則獲得鎖,節點越多,加鎖的過程越慢;

2)高并發情況下,未獲得鎖的線程會睡眠重試,如果同一把鎖競爭非常激烈,會占用非常多的系統資源;

3)歷史原因導致的坑挺多的,自己很難實現出來健壯的redis分布式鎖;

總之,redis分布式鎖的優點是大于缺點的,而且社區活躍,這也是我們大部分系統使用redis作為分布式鎖的原因。

推薦閱讀

1、死磕 java同步系列之開篇

2、死磕 java魔法類之Unsafe解析

3、死磕 java同步系列之JMM(Java Memory Model)

4、死磕 java同步系列之volatile解析

5、死磕 java同步系列之synchronized解析

6、死磕 java同步系列之自己動手寫一個鎖Lock

7、死磕 java同步系列之AQS起篇

8、死磕 java同步系列之ReentrantLock源碼解析(一)——公平鎖、非公平鎖

9、死磕 java同步系列之ReentrantLock源碼解析(二)——條件鎖

10、死磕 java同步系列之ReentrantLock VS synchronized

11、死磕 java同步系列之ReentrantReadWriteLock源碼解析

12、死磕 java同步系列之Semaphore源碼解析

13、死磕 java同步系列之CountDownLatch源碼解析

14、死磕 java同步系列之AQS終篇

15、死磕 java同步系列之StampedLock源碼解析

16、死磕 java同步系列之CyclicBarrier源碼解析

17、死磕 java同步系列之Phaser源碼解析

18、死磕 java同步系列之mysql分布式鎖

19、死磕 java同步系列之zookeeper分布式鎖


歡迎關注我的公眾號“彤哥讀源碼”,查看更多源碼系列文章, 與彤哥一起暢游源碼的海洋。

qrcode

posted @ 2019-10-04 09:01 彤哥讀源碼 閱讀(...) 評論(...) 編輯 收藏
乐就娱乐