Redis解讀(3):Redis分布式鎖、消息隊列、操作位圖進階應用
Redis 做分布式鎖
分布式鎖也算是 Redis 比較常見的使用場景
問題場景:
例如一個簡單的用戶操作,一個線城去修改用戶的狀態,首先從數據庫中讀出用戶的狀態,然后
在內存中進行修改,修改完成后,再存回去。在單線程中,這個操作沒有問題,但是在多線程
中,由于讀取、修改、存 這是三個操作,不是原子操作,所以在多線程中,這樣會出問題。
對于這種問題,我們可以使用分布式鎖來限制程序的并發執行。
1.基本用法
分布式鎖實現的思路很簡單,就是進來一個線城先占位,當別的線城進來操作時,發現已經有人占位
了,就會放棄或者稍后再試。
在 Redis 中,占位一般使用 setnx 指令,先進來的線城先占位,線城的操作執行完成后,再調用 del 指
令釋放位子。
根據上面的思路,我們寫出的代碼如下:
package org.taoguoguo.distributed.lock;
import org.taoguoguo.redis.Redis;
/**
* @author taoguoguo
* @description LockTest
* @website http://www.rzrgm.cn/doondo
* @create 2021-04-11 16:19
*/
public class LockTest {
public static void main(String[] args) {
Redis redis = new Redis();
redis.execute(jedis -> {
Long setnx = jedis.setnx("lockName", "lockValue");
if(1 == setnx){
//沒有線程占位,執行業務代碼
jedis.set("name","taoguoguo");
System.out.println(jedis.get("name"));
//釋放資源
jedis.del("lockName");
}else{
//有線程占位,停止/暫緩 操作
}
});
}
}
上面的代碼存在一個小小問題:如果代碼業務執行的過程中拋異常或者掛了,這樣會導致 del 指令沒有
被調用,這樣,lockName 無法釋放,后面來的請求全部堵塞在這里,鎖也永遠得不到釋放。
要解決這個問題,我們可以給鎖添加一個過期時間,確保鎖在一定的時間之后,能夠得到釋放。改進后
的代碼如下:
package org.taoguoguo.distributed.lock;
import org.taoguoguo.redis.Redis;
/**
* @author taoguoguo
* @description LockTest
* @website http://www.rzrgm.cn/doondo
* @create 2021-04-11 16:19
*/
public class LockTest {
public static void main(String[] args) {
Redis redis = new Redis();
redis.execute(jedis -> {
Long setnx = jedis.setnx("lockName", "lockValue");
if(1 == setnx){
//給鎖添加一個過期時間,防止應用在運行過程中拋出異常導致鎖無法及時得到釋放
jedis.expire("lockName",5);
//沒有線程占位,執行業務代碼
jedis.set("name","taoguoguo");
System.out.println(jedis.get("name"));
//釋放資源
jedis.del("lockName");
}else{
//有線程占位,停止/暫緩 操作
}
});
}
}
這樣改造之后,還有一個問題,就是在獲取鎖和設置過期時間之間如果如果服務器突然掛掉了,這個時
候鎖被占用,無法及時得到釋放,也會造成死鎖,因為獲取鎖和設置過期時間是兩個操作,不具備原子
性。
為了解決這個問題,從 Redis2.8 開始,setnx 和 expire 可以通過一個命令一起來執行了,我們對上述
代碼再做改進:
package org.taoguoguo.distributed.lock;
import org.taoguoguo.redis.Redis;
import redis.clients.jedis.params.SetParams;
/**
* @author taoguoguo
* @description LockTest
* @website http://www.rzrgm.cn/doondo
* @create 2021-04-11 16:19
*/
public class LockTest {
public static void main(String[] args) {
Redis redis = new Redis();
redis.execute(jedis -> {
String set = jedis.set("lockName", "lockValue", new SetParams().nx().ex(5));
if(set != null && "OK".equals(set)){
//沒有線程占位,執行業務代碼
jedis.set("name","taoguoguo");
System.out.println(jedis.get("name"));
//釋放資源
jedis.del("lockName");
}else{
//有線程占位,停止/暫緩 操作
}
});
}
}
2.解決超時問題
問題場景:
為了防止業務代碼在執行的時候拋出異常,我們給每一個鎖添加了一個超時時間,超時之后,鎖會被自
動釋放,但是這也帶來了一個新的問題:如果要執行的業務非常耗時,可能會出現紊亂。舉個例子:第
一個線程首先獲取到鎖,然后開始執行業務代碼,但是業務代碼比較耗時,執行了 8 秒,這樣,會在第
一個線程的任務還未執行成功鎖就會被釋放了,此時第二個線程會獲取到鎖開始執行,在第二個線程剛
執行了 3 秒,第一個線程也執行完了,此時第一個線程會釋放鎖,但是注意,它釋放的第二個線程的
鎖,釋放之后,第三個線程進來。
對于這個問題,我們可以從兩個角度入手:
- 盡量避免在獲取鎖之后,執行耗時操作。
- 可以在鎖上面做文章,將鎖的 value 設置為一個隨機字符串,每次釋放鎖的時候,都去比較隨機
字符串是否一致,如果一致,再去釋放,否則,不釋放。
對于第二種方案,由于釋放鎖的時候,要去查看鎖的 value,第二個比較 value 的值是否正確,第三步
釋放鎖,有三個步驟,很明顯三個步驟不具備原子性,為了解決這個問題,我們得引入 Lua 腳本。
Lua 腳本的優勢:
-
使用方便,Redis 中內置了對 Lua 腳本的支持。
-
Lua 腳本可以在 Redis 服務端原子的執行多個 Redis 命令。
-
由于網絡在很大程度上會影響到 Redis 性能,而使用 Lua 腳本可以讓多個命令一次執行,可以有
效解決網絡給 Redis 帶來的性能問題。
在 Redis 中,使用 Lua 腳本,大致上兩種思路:
- 提前在 Redis 服務端寫好 Lua 腳本,然后在 Java 客戶端去調用腳本(推薦)。
- 可以直接在 Java 端去寫 Lua 腳本,寫好之后,需要執行時,每次將腳本發送到 Redis 上去執行。
首先在 Redis 服務端創建 Lua 腳本,內容如下:
if redis.call("get",KEYS[1])==ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
接下來,可以給 Lua 腳本求一個 SHA1 和,命令如下:
cat releasewherevalueequal.lua | redis-cli -a 123456 script load --pipe

script load 這個命令會在 Redis 服務器中緩存 Lua 腳本,并返回腳本內容的 SHA1 校驗和,然后在 Java 端調用時,傳入 SHA1 校驗和作為參數,這樣 Redis 服務端就知道執行哪個腳本了。
接下來,在 Java 端調用這個腳本。
package org.taoguoguo.redis;
import redis.clients.jedis.params.SetParams;
import java.util.Arrays;
import java.util.UUID;
/**
* @author taoguoguo
* @description LuaTest
* @website http://www.rzrgm.cn/doondo
* @create 2021-04-11 17:56
*/
public class LuaTest {
public static void main(String[] args) {
Redis redis = new Redis();
for (int i=0; i<10; i++){
redis.execute(jedis -> {
//1.先獲取一個隨機字符串
String value = UUID.randomUUID().toString();
//2.獲取鎖
String lock = jedis.set("lockName", value, new SetParams().nx().ex(5));
//3.判斷是否成功拿到鎖
if (lock != null && "OK".equals(lock)) {
//4.具體的業務操作
jedis.set("site", "http://www.rzrgm.cn/doondo");
String site = jedis.get("site");
System.out.println(site);
//5.釋放鎖
jedis.evalsha("b8059ba43af6ffe8bed3db65bac35d452f8115d8", Arrays.asList("lockName"), Arrays.asList(value));
} else {
System.out.println("沒拿到鎖");
}
});
}
}
}
Redis 做消息隊列
我們平時說到消息隊列,一般都是指 RabbitMQ、RocketMQ、ActiveMQ 以及大數據里邊的 Kafka,
這些是我們比較常見的消息中間件,也是非常專業的消息中間件,作為專業的中間件,它里邊提供了許
多功能。
但是,當我們需要使用消息中間件的時候,并非每次都需要非常專業的消息中間件,假如我們只有一個
消息隊列,只有一個消費者,那就沒有必要去使用上面這些專業的消息中間件,這種情況我們可以直接
使用 Redis 來做消息隊列。
Redis 的消息隊列不是特別專業,他沒有很多高級特性,適用簡單的場景,如果對于消息可靠性有著極
高的追求,那么不適合使用 Redis 做消息隊列。
1.消息隊列
Redis 做消息隊列,使用它里邊的 List 數據結構就可以實現,我們可以使用 lpush/rpush 操作來實現入
隊,然后使用 lpop/rpop 來實現出隊。
回顧一下:

在客戶端(例如 Java 端),我們會維護一個死循環來不停的從隊列中讀取消息,并處理,如果隊列中
有消息,則直接獲取到,如果沒有消息,就會陷入死循環,直到下一次有消息進入,這種死循環會造成
大量的資源浪費,這個時候,我們可以使用之前講的 blpop/brpop 。
2.延遲消息隊列
延遲隊列可以通過 zset 來實現,因為 zset 中有一個 score,我們可以把時間作為 score,將 value 存到
redis 中,然后通過輪詢的方式,去不斷的讀取消息出來。
首先,如果消息是一個字符串,直接發送即可,如果是一個對象,則需要對對象進行序列化,這里我們
使用 JSON 來實現序列化和反序列化。
所以,首先在項目中,添加 JSON 依賴:
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.3</version>
</dependency>
接下來,構造一個消息對象:
package org.taoguoguo.message;
/**
* @author taoguoguo
* @description RedisMessage 消息對象
* @website http://www.rzrgm.cn/doondo
* @create 2021-04-12 20:33
*/
public class RedisMessage {
//消息ID
private String id;
//消息體
private Object data;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Object getData() {
return data;
}
public void setData(Object data) {
this.data = data;
}
@Override
public String toString() {
return "RedisMessage{" +
"id='" + id + '\'' +
", data=" + data +
'}';
}
}
接下來封裝一個消息隊列:
package org.taoguoguo.message;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import redis.clients.jedis.Jedis;
import java.util.Date;
import java.util.Set;
import java.util.UUID;
/**
* @author taoguoguo
* @description DelayMessageQueue 延遲消息隊列
* @website http://www.rzrgm.cn/doondo
* @create 2021-04-12 20:35
*/
public class DelayMessageQueue {
private Jedis jedis;
//消息隊列隊列名
private String queue;
public DelayMessageQueue(Jedis jedis, String queue) {
this.jedis = jedis;
this.queue = queue;
}
/**
* 消息入隊
* @param data 要發送的消息
*/
public void queue(Object data){
try {
//1.構造一個Redis消息對象
RedisMessage message = new RedisMessage();
message.setId(UUID.randomUUID().toString());
message.setData(data);
//2.序列化
String jsonMessage = new ObjectMapper().writeValueAsString(message);
System.out.println("Redis Message publish: " + new Date());
//消息發送,score 延遲 5 秒
jedis.zadd(queue, System.currentTimeMillis()+5000,jsonMessage);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
/**
* 消息消費
*/
public void loop(){
//當前線程未被打斷 一直監聽
while (!Thread.interrupted()){
//讀取 score 在 0 到當前時間戳之間的消息 一次讀取一條,偏移量為0
Set<String> messageSet = jedis.zrangeByScore(queue, 0, System.currentTimeMillis(), 0, 1);
if(messageSet.isEmpty()){
try {
//如果消息是空的,則休息 500 毫秒然后繼續
Thread.sleep(500);
} catch (InterruptedException e) {
//如果拋出異常 退出循環
break;
}
continue;
}
//如果讀取到了消息,則直接讀取出來
String messageStr = messageSet.iterator().next();
if(jedis.zrem(queue,messageStr) > 0){
//消息存在,并且消費成功
try {
RedisMessage redisMessage = new ObjectMapper().readValue(messageStr, RedisMessage.class);
System.out.println("Redis Message receive: " + new Date() + redisMessage);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
}
}
}
測試:
package org.taoguoguo.message;
import org.taoguoguo.redis.Redis;
/**
* @author taoguoguo
* @description DelayMessageTest
* @website http://www.rzrgm.cn/doondo
* @create 2021-04-12 21:20
*/
public class DelayMessageTest {
public static void main(String[] args) {
Redis redis = new Redis();
redis.execute(jedis -> {
//構造一個消息隊列
DelayMessageQueue queue = new DelayMessageQueue(jedis, "taoguoguo-delay-queue");
//構造消息生產者
Thread producer = new Thread(){
@Override
public void run() {
for(int i=0;i<5;i++){
queue.queue("http://www.rzrgm.cn/doondo>>>>>"+i);
}
}
};
//構造消息消費者
Thread consumer = new Thread(){
@Override
public void run() {
queue.loop();
}
};
//啟動
producer.start();
consumer.start();
//消費完成后,停止程序,時間大于消費時間
try {
Thread.sleep(10000);
consumer.interrupt();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
Redis操作位圖
1.基本介紹
用戶一年的簽到記錄,如果你用 string 類型來存儲,那你需要 365 個 key/value,操作起來麻煩。通過位圖可以有效的簡化這個操作。
它的統計很簡單:01111000111
每天的記錄占一個位,365 天就是 365 個位,大概 46 個字節,這樣可以有效的節省存儲空間,如果有一天想要統計用戶一共簽到了多少天,統計 1 的個數即可。
對于位圖的操作,可以直接操作對應的字符串(get/set),可以直接操作位(getbit/setbit)
2.基本操作
2.1零存整取
存的時候操作的是位,獲取的時候是獲取整個字符串
例如存儲一個 Java 字符串:
| 字符 | ASCII | 二進制 |
|---|---|---|
| J | 74 | 01001010 |
| a | 97 | 01100001 |
| v | 118 | 01110110 |


2.2整存零取
存一個字符串進去,但是通過位操作獲取字符串。

3.統計
例如簽到記錄:01111000111
1 表示簽到的天,0 表示沒簽到,統計總的簽到天數:可以使用 bitcount。

bitcount 中,可以統計的起始位置,但是注意,這個起始位置是指字符的起始位置而不是 bit 的起始位置。
除了 bitcount 之外,還有一個 bitpos。bitpos 可以用來統計在指定范圍內出現的第一個 1 或者 0 的位置,這個命令中的起始和結束位置都是字符索引,不是 bit 索引,一定要注意。
4.Bit 批處理
在 Redis 3.2 之后,新加了一個功能叫做 bitfiled ,可以對 bit 進行批量操作。
例如:
BITFIELD name get u4 0
表示獲取 name 中的位,從 0 開始獲取,獲取 4 個位,返回一個無符號數字。
- u 表示無符號數字
- i 表示有符號數字,有符號的話,第一個符號就表示符號位,1 表示是一個負數。
BITFIELD 也可以一次執行多個操作。
GET(對于結果不太明白的,學習一下計算機中 位與字節、以及進制之間的關系)

可以一次性進行多個GET

SET:
用無符號的 98 轉成的 8 位二進制數字,代替從第 8 位開始接下來的 8 位數字。

INCRBY:
對置頂范圍進行自增操作,自增操作可能會出現溢出,既可能是向上溢出,也可能是向下溢出。Redis 中對于溢出的處理方案是折返。8 位無符號數 255 加 1 溢出變為 0;8 位有符號數 127,加 1 變為 - 128。
也可以修改默認的溢出策略,可以改為 fail ,表示執行失敗。
BITFIELD name overflow fail incrby u2 6 1
sat 表示留在在最大/最小值。
BITFIELD name overflow sat incrby u2 6 1


浙公網安備 33010602011771號