<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Redis解讀(3):Redis分布式鎖、消息隊列、操作位圖進階應用

      Redis 做分布式鎖

      分布式鎖也算是 Redis 比較常見的使用場景

      問題場景:

      例如一個簡單的用戶操作,一個線城去修改用戶的狀態,首先從數據庫中讀出用戶的狀態,然后
      在內存中進行修改,修改完成后,再存回去。在單線程中,這個操作沒有問題,但是在多線程
      中,由于讀取、修改、存 這是三個操作,不是原子操作,所以在多線程中,這樣會出問題。

      對于這種問題,我們可以使用分布式鎖來限制程序的并發執行。

      1.基本用法

      分布式鎖實現的思路很簡單,就是進來一個線城先占位,當別的線城進來操作時,發現已經有人占位
      了,就會放棄或者稍后再試。

      在 Redis 中,占位一般使用 setnx 指令,先進來的線城先占位,線城的操作執行完成后,再調用 del 指
      令釋放位子。

      根據上面的思路,我們寫出的代碼如下:

      package org.taoguoguo.distributed.lock;
      import org.taoguoguo.redis.Redis;
      
      /**
       * @author taoguoguo
       * @description LockTest
       * @website http://www.rzrgm.cn/doondo
       * @create 2021-04-11 16:19
       */
      public class LockTest {
          public static void main(String[] args) {
              Redis redis = new Redis();
              redis.execute(jedis -> {
                  Long setnx = jedis.setnx("lockName", "lockValue");
                  if(1 == setnx){
                      //沒有線程占位,執行業務代碼
                      jedis.set("name","taoguoguo");
                      System.out.println(jedis.get("name"));
                      //釋放資源
                      jedis.del("lockName");
                  }else{
                      //有線程占位,停止/暫緩 操作
                  }
              });
          }
      }
      

      上面的代碼存在一個小小問題:如果代碼業務執行的過程中拋異常或者掛了,這樣會導致 del 指令沒有
      被調用,這樣,lockName 無法釋放,后面來的請求全部堵塞在這里,鎖也永遠得不到釋放。

      要解決這個問題,我們可以給鎖添加一個過期時間,確保鎖在一定的時間之后,能夠得到釋放。改進后
      的代碼如下:

      package org.taoguoguo.distributed.lock;
      import org.taoguoguo.redis.Redis;
      
      /**
       * @author taoguoguo
       * @description LockTest
       * @website http://www.rzrgm.cn/doondo
       * @create 2021-04-11 16:19
       */
      public class LockTest {
          public static void main(String[] args) {
              Redis redis = new Redis();
              redis.execute(jedis -> {
                  Long setnx = jedis.setnx("lockName", "lockValue");
                  if(1 == setnx){
                      //給鎖添加一個過期時間,防止應用在運行過程中拋出異常導致鎖無法及時得到釋放
                      jedis.expire("lockName",5);
                      //沒有線程占位,執行業務代碼
                      jedis.set("name","taoguoguo");
                      System.out.println(jedis.get("name"));
                      //釋放資源
                      jedis.del("lockName");
                  }else{
                      //有線程占位,停止/暫緩 操作
                  }
              });
          }
      }
      

      這樣改造之后,還有一個問題,就是在獲取鎖和設置過期時間之間如果如果服務器突然掛掉了,這個時
      候鎖被占用,無法及時得到釋放,也會造成死鎖,因為獲取鎖和設置過期時間是兩個操作,不具備原子
      性。

      為了解決這個問題,從 Redis2.8 開始,setnx 和 expire 可以通過一個命令一起來執行了,我們對上述
      代碼再做改進:

      package org.taoguoguo.distributed.lock;
      import org.taoguoguo.redis.Redis;
      import redis.clients.jedis.params.SetParams;
      
      /**
       * @author taoguoguo
       * @description LockTest
       * @website http://www.rzrgm.cn/doondo
       * @create 2021-04-11 16:19
       */
      public class LockTest {
          public static void main(String[] args) {
              Redis redis = new Redis();
              redis.execute(jedis -> {
                  String set = jedis.set("lockName", "lockValue", new SetParams().nx().ex(5));
                  if(set != null && "OK".equals(set)){
                      //沒有線程占位,執行業務代碼
                      jedis.set("name","taoguoguo");
                      System.out.println(jedis.get("name"));
                      //釋放資源
                      jedis.del("lockName");
                  }else{
                      //有線程占位,停止/暫緩 操作
                  }
              });
          }
      }
      
      

      2.解決超時問題

      問題場景:

      為了防止業務代碼在執行的時候拋出異常,我們給每一個鎖添加了一個超時時間,超時之后,鎖會被自
      動釋放,但是這也帶來了一個新的問題:如果要執行的業務非常耗時,可能會出現紊亂。舉個例子:第
      一個線程首先獲取到鎖,然后開始執行業務代碼,但是業務代碼比較耗時,執行了 8 秒,這樣,會在第
      一個線程的任務還未執行成功鎖就會被釋放了,此時第二個線程會獲取到鎖開始執行,在第二個線程剛
      執行了 3 秒,第一個線程也執行完了,此時第一個線程會釋放鎖,但是注意,它釋放的第二個線程的
      鎖,釋放之后,第三個線程進來。

      對于這個問題,我們可以從兩個角度入手:

      • 盡量避免在獲取鎖之后,執行耗時操作。
      • 可以在鎖上面做文章,將鎖的 value 設置為一個隨機字符串,每次釋放鎖的時候,都去比較隨機
        字符串是否一致,如果一致,再去釋放,否則,不釋放。

      對于第二種方案,由于釋放鎖的時候,要去查看鎖的 value,第二個比較 value 的值是否正確,第三步
      釋放鎖,有三個步驟,很明顯三個步驟不具備原子性,為了解決這個問題,我們得引入 Lua 腳本。

      Lua 腳本的優勢:

      • 使用方便,Redis 中內置了對 Lua 腳本的支持。

      • Lua 腳本可以在 Redis 服務端原子的執行多個 Redis 命令。

      • 由于網絡在很大程度上會影響到 Redis 性能,而使用 Lua 腳本可以讓多個命令一次執行,可以有
        效解決網絡給 Redis 帶來的性能問題。

      在 Redis 中,使用 Lua 腳本,大致上兩種思路:

      1. 提前在 Redis 服務端寫好 Lua 腳本,然后在 Java 客戶端去調用腳本(推薦)。
      2. 可以直接在 Java 端去寫 Lua 腳本,寫好之后,需要執行時,每次將腳本發送到 Redis 上去執行。

      首先在 Redis 服務端創建 Lua 腳本,內容如下:

      if redis.call("get",KEYS[1])==ARGV[1] then
         return redis.call("del",KEYS[1])
      else
         return 0
      end
      

      接下來,可以給 Lua 腳本求一個 SHA1 和,命令如下:

      cat releasewherevalueequal.lua | redis-cli -a 123456 script load --pipe
      

      script load 這個命令會在 Redis 服務器中緩存 Lua 腳本,并返回腳本內容的 SHA1 校驗和,然后在 Java 端調用時,傳入 SHA1 校驗和作為參數,這樣 Redis 服務端就知道執行哪個腳本了。

      接下來,在 Java 端調用這個腳本。

      package org.taoguoguo.redis;
      
      import redis.clients.jedis.params.SetParams;
      import java.util.Arrays;
      import java.util.UUID;
      
      /**
       * @author taoguoguo
       * @description LuaTest
       * @website http://www.rzrgm.cn/doondo
       * @create 2021-04-11 17:56
       */
      public class LuaTest {
          public static void main(String[] args) {
              Redis redis = new Redis();
              for (int i=0; i<10; i++){
                  redis.execute(jedis -> {
                      //1.先獲取一個隨機字符串
                      String value = UUID.randomUUID().toString();
                      //2.獲取鎖
                      String lock = jedis.set("lockName", value, new SetParams().nx().ex(5));
                      //3.判斷是否成功拿到鎖
                      if (lock != null && "OK".equals(lock)) {
                          //4.具體的業務操作
                          jedis.set("site", "http://www.rzrgm.cn/doondo");
                          String site = jedis.get("site");
                          System.out.println(site);
                          //5.釋放鎖
                          jedis.evalsha("b8059ba43af6ffe8bed3db65bac35d452f8115d8", Arrays.asList("lockName"), Arrays.asList(value));
                      } else {
                          System.out.println("沒拿到鎖");
                      }
                  });
              }
          }
      }
      
      

      Redis 做消息隊列

      我們平時說到消息隊列,一般都是指 RabbitMQ、RocketMQ、ActiveMQ 以及大數據里邊的 Kafka,
      這些是我們比較常見的消息中間件,也是非常專業的消息中間件,作為專業的中間件,它里邊提供了許
      多功能。

      但是,當我們需要使用消息中間件的時候,并非每次都需要非常專業的消息中間件,假如我們只有一個
      消息隊列,只有一個消費者,那就沒有必要去使用上面這些專業的消息中間件,這種情況我們可以直接
      使用 Redis 來做消息隊列。

      Redis 的消息隊列不是特別專業,他沒有很多高級特性,適用簡單的場景,如果對于消息可靠性有著極
      高的追求,那么不適合使用 Redis 做消息隊列。

      1.消息隊列

      Redis 做消息隊列,使用它里邊的 List 數據結構就可以實現,我們可以使用 lpush/rpush 操作來實現入
      隊,然后使用 lpop/rpop 來實現出隊。

      回顧一下:

      在客戶端(例如 Java 端),我們會維護一個死循環來不停的從隊列中讀取消息,并處理,如果隊列中
      有消息,則直接獲取到,如果沒有消息,就會陷入死循環,直到下一次有消息進入,這種死循環會造成
      大量的資源浪費,這個時候,我們可以使用之前講的 blpop/brpop 。

      2.延遲消息隊列

      延遲隊列可以通過 zset 來實現,因為 zset 中有一個 score,我們可以把時間作為 score,將 value 存到
      redis 中,然后通過輪詢的方式,去不斷的讀取消息出來。

      首先,如果消息是一個字符串,直接發送即可,如果是一個對象,則需要對對象進行序列化,這里我們
      使用 JSON 來實現序列化和反序列化。

      所以,首先在項目中,添加 JSON 依賴:

      <dependency>
          <groupId>com.fasterxml.jackson.core</groupId>
          <artifactId>jackson-databind</artifactId>
          <version>2.10.3</version>
      </dependency>
      

      接下來,構造一個消息對象:

      package org.taoguoguo.message;
      
      /**
       * @author taoguoguo
       * @description RedisMessage 消息對象
       * @website http://www.rzrgm.cn/doondo
       * @create 2021-04-12 20:33
       */
      public class RedisMessage {
      
          //消息ID
          private String id;
          //消息體
          private Object data;
      
          public String getId() {
              return id;
          }
      
          public void setId(String id) {
              this.id = id;
          }
      
          public Object getData() {
              return data;
          }
      
          public void setData(Object data) {
              this.data = data;
          }
      
          @Override
          public String toString() {
              return "RedisMessage{" +
                      "id='" + id + '\'' +
                      ", data=" + data +
                      '}';
          }
      }
      

      接下來封裝一個消息隊列:

      package org.taoguoguo.message;
      
      import com.fasterxml.jackson.core.JsonProcessingException;
      import com.fasterxml.jackson.databind.ObjectMapper;
      import redis.clients.jedis.Jedis;
      
      import java.util.Date;
      import java.util.Set;
      import java.util.UUID;
      
      /**
       * @author taoguoguo
       * @description DelayMessageQueue 延遲消息隊列
       * @website http://www.rzrgm.cn/doondo
       * @create 2021-04-12 20:35
       */
      public class DelayMessageQueue {
      
          private Jedis jedis;
          //消息隊列隊列名
          private String queue;
      
          public DelayMessageQueue(Jedis jedis, String queue) {
              this.jedis = jedis;
              this.queue = queue;
          }
      
          /**
           * 消息入隊
           * @param data 要發送的消息
           */
          public void queue(Object data){
              try {
                  //1.構造一個Redis消息對象
                  RedisMessage message = new RedisMessage();
                  message.setId(UUID.randomUUID().toString());
                  message.setData(data);
                  //2.序列化
                  String jsonMessage = new ObjectMapper().writeValueAsString(message);
                  System.out.println("Redis Message publish: " + new Date());
                  //消息發送,score 延遲 5 秒
                  jedis.zadd(queue, System.currentTimeMillis()+5000,jsonMessage);
              } catch (JsonProcessingException e) {
                  e.printStackTrace();
              }
          }
      
          /**
           * 消息消費
           */
          public void loop(){
              //當前線程未被打斷 一直監聽
              while (!Thread.interrupted()){
                  //讀取 score 在 0 到當前時間戳之間的消息 一次讀取一條,偏移量為0
                  Set<String> messageSet = jedis.zrangeByScore(queue, 0, System.currentTimeMillis(), 0, 1);
                  if(messageSet.isEmpty()){
                      try {
                          //如果消息是空的,則休息 500 毫秒然后繼續
                          Thread.sleep(500);
                      } catch (InterruptedException e) {
                          //如果拋出異常 退出循環
                          break;
                      }
                      continue;
                  }
                  //如果讀取到了消息,則直接讀取出來
                  String messageStr = messageSet.iterator().next();
                  if(jedis.zrem(queue,messageStr) > 0){
                      //消息存在,并且消費成功
                      try {
                          RedisMessage redisMessage = new ObjectMapper().readValue(messageStr, RedisMessage.class);
                          System.out.println("Redis Message receive: " + new Date() + redisMessage);
                      } catch (JsonProcessingException e) {
                          e.printStackTrace();
                      }
                  }
              }
          }
      
      }
      

      測試:

      package org.taoguoguo.message;
      
      import org.taoguoguo.redis.Redis;
      
      /**
       * @author taoguoguo
       * @description DelayMessageTest
       * @website http://www.rzrgm.cn/doondo
       * @create 2021-04-12 21:20
       */
      public class DelayMessageTest {
          public static void main(String[] args) {
              Redis redis = new Redis();
              redis.execute(jedis -> {
                  //構造一個消息隊列
                  DelayMessageQueue queue = new DelayMessageQueue(jedis, "taoguoguo-delay-queue");
                  //構造消息生產者
                  Thread producer = new Thread(){
                      @Override
                      public void run() {
                          for(int i=0;i<5;i++){
                              queue.queue("http://www.rzrgm.cn/doondo>>>>>"+i);
                          }
                      }
                  };
      
                  //構造消息消費者
                  Thread consumer = new Thread(){
                      @Override
                      public void run() {
                          queue.loop();
                      }
                  };
      
                  //啟動
                  producer.start();
                  consumer.start();
                  //消費完成后,停止程序,時間大于消費時間
                  try {
                      Thread.sleep(10000);
                      consumer.interrupt();
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              });
          }
      }
      

      Redis操作位圖

      1.基本介紹

      用戶一年的簽到記錄,如果你用 string 類型來存儲,那你需要 365 個 key/value,操作起來麻煩。通過位圖可以有效的簡化這個操作。

      它的統計很簡單:01111000111

      每天的記錄占一個位,365 天就是 365 個位,大概 46 個字節,這樣可以有效的節省存儲空間,如果有一天想要統計用戶一共簽到了多少天,統計 1 的個數即可。

      對于位圖的操作,可以直接操作對應的字符串(get/set),可以直接操作位(getbit/setbit)

      2.基本操作

      2.1零存整取

      存的時候操作的是位,獲取的時候是獲取整個字符串

      例如存儲一個 Java 字符串:

      字符 ASCII 二進制
      J 74 01001010
      a 97 01100001
      v 118 01110110

      2.2整存零取

      存一個字符串進去,但是通過位操作獲取字符串。

      3.統計

      例如簽到記錄:01111000111
      1 表示簽到的天,0 表示沒簽到,統計總的簽到天數:可以使用 bitcount。

      bitcount 中,可以統計的起始位置,但是注意,這個起始位置是指字符的起始位置而不是 bit 的起始位置。

      除了 bitcount 之外,還有一個 bitpos。bitpos 可以用來統計在指定范圍內出現的第一個 1 或者 0 的位置,這個命令中的起始和結束位置都是字符索引,不是 bit 索引,一定要注意。

      4.Bit 批處理

      在 Redis 3.2 之后,新加了一個功能叫做 bitfiled ,可以對 bit 進行批量操作。

      例如:

      BITFIELD name get u4 0
      

      表示獲取 name 中的位,從 0 開始獲取,獲取 4 個位,返回一個無符號數字。

      • u 表示無符號數字
      • i 表示有符號數字,有符號的話,第一個符號就表示符號位,1 表示是一個負數。

      BITFIELD 也可以一次執行多個操作。

      GET(對于結果不太明白的,學習一下計算機中 位與字節、以及進制之間的關系)

      可以一次性進行多個GET

      SET:

      用無符號的 98 轉成的 8 位二進制數字,代替從第 8 位開始接下來的 8 位數字。

      INCRBY:

      對置頂范圍進行自增操作,自增操作可能會出現溢出,既可能是向上溢出,也可能是向下溢出。Redis 中對于溢出的處理方案是折返。8 位無符號數 255 加 1 溢出變為 0;8 位有符號數 127,加 1 變為 - 128。

      也可以修改默認的溢出策略,可以改為 fail ,表示執行失敗。

      BITFIELD name overflow fail incrby u2 6 1
      

      sat 表示留在在最大/最小值。

      BITFIELD name overflow sat incrby u2 6 1
      

      posted @ 2021-04-14 16:10  DOONDO  閱讀(393)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 亚洲区一区二区三区精品| 性欧美暴力猛交69hd| 90后极品粉嫩小泬20p| 亚洲日韩图片专区第1页| 哈密市| 加勒比色综合久久久久久久久| 日韩国产中文字幕精品| 国产av人人夜夜澡人人爽麻豆| 久久日产一线二线三线| 久久精品国产91久久麻豆| 中文字幕在线视频不卡| 人妻精品动漫h无码| 日韩深夜视频在线观看| 色综合热无码热国产| 国产精品深夜福利免费观看| 好吊妞| 亚洲国产成人片在线观看无码 | 少妇人妻激情乱人伦| 欧美精品在线观看视频 | 91精品国产91热久久久久福利 | 久久免费偷拍视频有没有| 国产一码二码三码区别| 97在线碰| 美女禁区a级全片免费观看| 艳妇乳肉豪妇荡乳xxx| 无码日韩精品一区二区免费| 久久婷婷五月综合色99啪ak| 亚洲精品中文av在线| 亚洲av无在线播放中文| 国产在线播放专区av| 午夜国产精品福利一二| 日韩区二区三区中文字幕| 欧美国产精品不卡在线观看| 老司机性色福利精品视频| 蜜桃精品成人影片| 亚洲狠狠狠一区二区三区| 97欧美精品系列一区二区| 无码人妻丝袜在线视频红杏| 国产成人精品一区二三区| 亚洲午夜香蕉久久精品| 99久久免费精品国产色|