<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      redisson內存泄漏問題排查

      問題描述

      最近生產有個服務突然出現頻繁告警,接口P99響應時間變長,運維同學觀察到相應的pod cpu飆升,內存占用很高。
      cpu升高問題排查是老生常談的話題了,一般可以使用top -p pid -H查看是哪個線程占用cpu高,再結合jstack找到對應的java線程代碼。
      不過經驗告訴我們,cpu升高還有另外一個更常見的原因,內存不足導致頻繁gc。垃圾收集器回收內存后又很快不足,繼續回收,循環這個過程,而gc期間涉及到STW,用戶線程會被掛起,響應時間自然會增加。這里的內存不足可能是正常的服務本身內存就不夠用,也可以是異常的程序bug導致內存溢出。
      果不其然,當時節點的full gc時間陡增,通過jstat -gcutil pid 500 30也可以看到fc非常頻繁。如圖:

      這個問題實際月初也出現過,當時研發同學和運維同學通過重啟暫時解決,今天又出現了,看來不是簡單通過“重啟大法”能解決的,這次我們需要分析解決它。

      排查過程

      這次我們通過heap dump將堆導出分析,命令:

      jmap -dump:format=b,file=./pid.hprof pid
      

      用jdk自帶的virsualvmidea virsualvm launcher插件打開堆文件可以看到

      很明顯,跟redisson相關,我們使用的版本是3.17.1!查找服務涉及到redisson的地方并不多,調用量高且可疑的只有一處,簡化后的代碼如下:

      RLock lock = this.redissonClient.getLock("mytest");
      lock.tryLock(50, 100, TimeUnit.MILLISECONDS);
              
      //業務代碼...
      
      RLock lock2 = this.redissonClient.getLock("mytest");
      if (lock2.isLocked() && lock2.isHeldByCurrentThread()) {
        lock2.unlock();
      }
      

      首先我們先簡單分析下RedissonLock tryLock和unlock的源碼,主要地方添加了備注。

          @Override
          public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
              long time = unit.toMillis(waitTime);
              long current = System.currentTimeMillis();
              long threadId = Thread.currentThread().getId();
              Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
              // 獲取到鎖,返回成功
              if (ttl == null) {
                  return true;
              }
              
              time -= System.currentTimeMillis() - current;
              if (time <= 0) {
                  //或取不到鎖,且超過等待時間,返回失敗
                  acquireFailed(waitTime, unit, threadId);
                  return false;
              }
              
              current = System.currentTimeMillis();
              //訂閱鎖釋放消息,subscribe是本次的核心!!!
              CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
              try {
                  subscribeFuture.get(time, TimeUnit.MILLISECONDS);
              } catch (ExecutionException | TimeoutException e) {
                  //超時,獲取鎖失敗
                  if (!subscribeFuture.cancel(false)) {
                      subscribeFuture.whenComplete((res, ex) -> {
                          if (ex == null) {
                              unsubscribe(res, threadId);
                          }
                      });
                  }
                  acquireFailed(waitTime, unit, threadId);
                  return false;
              }
      
              try {
                  time -= System.currentTimeMillis() - current;
                  if (time <= 0) {
                      acquireFailed(waitTime, unit, threadId);
                      return false;
                  }
              
                  //鎖釋放了,還未超時,自旋嘗試獲取
                  while (true) {
                      long currentTime = System.currentTimeMillis();
                      ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
                      // 獲取到鎖,返回成功
                      if (ttl == null) {
                          return true;
                      }
      
                      time -= System.currentTimeMillis() - currentTime;
                      if (time <= 0) {
                          //或取不到鎖,且超過等待時間,返回失敗
                          acquireFailed(waitTime, unit, threadId);
                          return false;
                      }
      
                      // 等待鎖釋放
                      currentTime = System.currentTimeMillis();
                      if (ttl >= 0 && ttl < time) {
                          commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                      } else {
                          commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                      }
      
                      time -= System.currentTimeMillis() - currentTime;
                      if (time <= 0) {
                          //或取不到鎖,且超過等待時間,返回失敗
                          acquireFailed(waitTime, unit, threadId);
                          return false;
                      }
                  }
              } finally {
                  //取消訂閱
                  unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
              }
          }
      
          @Override
          public RFuture<Void> unlockAsync(long threadId) {
              RFuture<Boolean> future = unlockInnerAsync(threadId);
      
              CompletionStage<Void> f = future.handle((opStatus, e) -> {
                  //取消鎖續期
                  cancelExpirationRenewal(threadId);
                  //...
              });
      
              return new CompletableFutureWrapper<>(f);
          }
      
          protected RFuture<Boolean> unlockInnerAsync(long threadId) {
              return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                      "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                              "return nil;" +
                              "end; " +
                              "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                              "if (counter > 0) then " +
                              "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                              "return 0; " +
                              "else " +
                              "redis.call('del', KEYS[1]); " +
                              "redis.call('publish', KEYS[2], ARGV[1]); " +
                              "return 1; " +
                              "end; " +
                              "return nil;",
                      Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
          }
      

      redisson加解鎖主要利用了lua腳本和redis的發布訂閱機制,使用到的數據結構是hash
      lua腳本保證了多個命令執行的原子性,不會有并發問題。
      在java代碼中使用synchroized/lock加鎖失敗時,可以將線程放到鏈表中等待喚醒重新獲取鎖。在使用redis的分布式系統中,使用的是發布訂閱機制,通過訂閱channel,當鎖釋放時重新獲取鎖。redis的發布訂閱跟我們使用kafka等mq中間件是一樣的原理,實際也可以用redis的發布訂閱機制來實現mq功能,如下channel相當于是mq中的topic。相關命令是:

      • PUBLISH channel message,發布一個消息到channel。
      • SUBSCRIBE channel [channel ...],訂閱channel,當channel有消息時,客戶端會收到通知。
      • UNSUBSCRIBE [channel [channel ...]],取消訂閱
      • PSUBSCRIBE pattern [pattern ...],訂閱匹配模式的channel
      • PUNSUBSCRIBE [pattern [pattern ...]],取消訂閱匹配模式的channel

      接下來是我們的排查過程:

      懷疑寫法問題

      回到我們的代碼,首先映入眼簾值得懷疑的是,加鎖和解鎖使用不是同個對象,如果redisson加解鎖是與對象狀態相關的,那就會有問題。
      但從源碼分析可以看到,解鎖邏輯非常簡單,主要使用到的是線程id,這個是不會變的。當然這種寫法還是要修正,除了會給人誤導,也沒必要多創建一個鎖對象。此外持有鎖的時間設置為100ms也太短了,盡管業務邏輯處理很快,但如果持有鎖期間發生full gc,鎖就會過期,其它線程就可以獲取到鎖,出現并發執行。

      懷疑網絡問題

      由于不是頻繁出現問題,一個月就出現一兩次,所以懷疑是不是某些特殊條件才觸發,例如當時出現過網絡抖動,主從切換等異常情況。聯系dba同學得知前一天redis網絡確實出現過抖動,結合生產日志發現8月份出現兩次問題的前一天都有redis異常,redisson github上也有一些相關討論,這更堅定了我的推測,在網絡異常情況下可能觸發某個bug,導致內存溢出,驗證這一點也浪費了我們不少時間。

      網絡問題主要有兩種,連接直接斷開和讀取超時。連接直接斷開我們連開發環境的redis很好模擬,直接將內網斷開即可。讀取超時可以使用redis-cli登錄redis server,然后使用client pause命令阻塞客戶端,如下會阻塞所有客戶端請求10s,這個命令在我平時一些模擬測試也經常用到。

      client pause 10000
      

      接著寫代碼循環測試,使用jvirsualvm觀察內存對象,發現并沒有問題,redisson相關對象占比都很低,且能被gc回收。

      for (int i = 0; i < 10000000; i++) {
        //貼入前面的代碼
      }
      

      源碼分析

      前面的源碼分析是最外層,最簡單的部分,還不足以幫忙我們發現問題。從前面subscribe方法進入,內部還有大量邏輯做并發控制和發布訂閱相關邏輯。
      進入subscribe,會調用PublishScribe的subscribe方法,接著會調用AsyncSemaphore的acquire方法獲取信號量。jdk的Semaphore我們都很熟悉,AsyncSemaphore是異步的形式,使用信號量最關鍵的就是申請到許可使用完后,要調用release方法歸還,否則其它申請者就無法再次申請到許可。

          public CompletableFuture<E> subscribe(String entryName, String channelName) {
              AsyncSemaphore semaphore = service.getSemaphore(new ChannelName(channelName));
              CompletableFuture<E> newPromise = new CompletableFuture<>();
      
              semaphore.acquire(() -> {
                  if (newPromise.isDone()) {
                      semaphore.release();
                      return;
                  }
      
                  E entry = entries.get(entryName);
                  if (entry != null) {
                      entry.acquire();
                      //1.釋放許可
                      semaphore.release();
                      //...
                      return;
                  }
      
                  E oldValue = entries.putIfAbsent(entryName, value);
                  if (oldValue != null) {
                      //2.釋放許可
                      semaphore.release();
                      //...
                      return;
                  }
      
                  RedisPubSubListener<Object> listener = createListener(channelName, value);
                  CompletableFuture<PubSubConnectionEntry> s = service.subscribeNoTimeout(LongCodec.INSTANCE, channelName, semaphore, listener);            
                  //...
              });
      
              return newPromise;
          }
      

      AsyncSemaphore主要代碼如下,permits是1,listeners是一個無界隊列。在我們dump出來的異常實例中有一個AsyncSemaphore lambda對象,也有CompletableFuture lambda對象,看起來和這里高度匹配,這里大概率就是問題所在了,應該是在某種情況下,acquire后沒有調用release,導致其它線程調用decrementAndGet的時候是<=0,進而沒法執行listeners.poll()移除元素,最終listeners隊列元素越來越多,直到內存溢出。

      public class AsyncSemaphore {
          private final AtomicInteger counter;
          private final Queue<CompletableFuture<Void>> listeners = new ConcurrentLinkedQueue<>();
      
          public AsyncSemaphore(int permits) {
              counter = new AtomicInteger(permits);
          }
      
          public CompletableFuture<Void> acquire() {
              CompletableFuture<Void> future = new CompletableFuture<>();
              listeners.add(future);
              tryRun();
              return future;
          }
      
          public void acquire(Runnable listener) {
              acquire().thenAccept(r -> listener.run());
          }
      
          private void tryRun() {
              while (true) {
                  if (counter.decrementAndGet() >= 0) {
                      CompletableFuture<Void> future = listeners.poll();
                      if (future == null) {
                          counter.incrementAndGet();
                          return;
                      }
      
                      if (future.complete(null)) {
                          return;
                      }
                  }
      
                  if (counter.incrementAndGet() <= 0) {
                      return;
                  }
              }
          }
      
          public void release() {
              counter.incrementAndGet();
              tryRun();
          }
      }
      

      關于Semaphore還有話說,如果一次acquire,但程序異常多次調用release,將導致許可超發,后續的acquire可以申請到許可執行。解決方案可以參考rocketmq SemaphoreReleaseOnlyOnce,它封裝了Semaphore,并維護一個AtomicBoolean,保證只能釋放一次。

      回到上面subscribe方法,有兩處正常調用了release,還有一處進入了PublishSubscribeServie的subscribeNoTimeout(LongCodec.INSTANCE, channelName, semaphore, listener)方法,重點這里傳的topicType類型是PubSubType.SUBSCRIBE

          public CompletableFuture<PubSubConnectionEntry> subscribeNoTimeout(Codec codec, String channelName,
                                                                    AsyncSemaphore semaphore, RedisPubSubListener<?>... listeners) {
              CompletableFuture<PubSubConnectionEntry> promise = new CompletableFuture<>();
              //重點:PubSubType.SUBSCRIBE
              subscribeNoTimeout(codec, new ChannelName(channelName), getEntry(new ChannelName(channelName)), promise,
                              PubSubType.SUBSCRIBE, semaphore, new AtomicInteger(), listeners);
              return promise;
          }
      

      里面的邏輯比較復雜,有興趣的同學可以自己分析分析,但我們關注的是每個分支最終都需要調用semaphore.release。
      按照這個思路,最終筆者在此處發現一處可能沒有調用release的方法:org.redisson.pubsub.PublishSubscribeService#unsubscribe。
      unsubscribe方法在complete的時候會執行lock.release(),它的complete是在BaseRedisPubSubListener回調中調用的,只有if條件成立才會執行。前面我們說傳記錄的topicType是subscribe,而這里BaseRedisPubSubListener處理的是unsubscribepunsubscribe類型,對應不上了,這就導致whenComplete不會執行,lock.release()不會執行。

       private CompletableFuture<Void> addListeners(ChannelName channelName, CompletableFuture<PubSubConnectionEntry> promise,
                  PubSubType type, AsyncSemaphore lock, PubSubConnectionEntry connEntry,
                  RedisPubSubListener<?>... listeners) {
      
              //...
              subscribeFuture.whenComplete((res, e) -> {
                  if (e != null) {
                      lock.release();
                      return;
                  }
      
                  if (!promise.complete(connEntry)) {
                      if (!connEntry.hasListeners(channelName)) {
                          unsubscribe(type, channelName)
                              .whenComplete((r, ex) -> {
                                  //這里不會被執行,AsyncSemaphore release沒有執行!
                                  lock.release();
                              });
                      } else {
                          lock.release();
                      }
                  } else {
                      lock.release();
                  }
              });
              return subscribeFuture;
      }
      
       public CompletableFuture<Void> unsubscribe(PubSubType topicType, ChannelName channelName) {
              //...
              BaseRedisPubSubListener listener = new BaseRedisPubSubListener() {
      
                  @Override
                  public boolean onStatus(PubSubType type, CharSequence channel) {
                      //這個if不會進入...
                      if (type == topicType && channel.equals(channelName)) {
                          executed.set(true);
      
                          if (entry.release() == 1) {
                              MasterSlaveEntry msEntry = getEntry(channelName);
                              msEntry.returnPubSubConnection(entry.getConnection());
                          }
      
                          //觸發外面whenComplete的執行
                          result.complete(null);
                          return true;
                      }
                      return false;
                  }
      
              };
      
              ChannelFuture future;
              //這里是unsubscribe和punsubscribe,而前面傳進來的topicType是subscribe,對不上了
              if (topicType == PubSubType.UNSUBSCRIBE) {
                  future = entry.unsubscribe(channelName, listener);
              } else {
                  future = entry.punsubscribe(channelName, listener);
              }
              return result;
      }
      

      問題復現

      前面分析得頭頭是道,我們還得通過實踐證明一下,有理有據才行。
      我的復現代碼如下,通過并發調用加鎖,開始運行加個斷點在org.redisson.pubsub.PublishSubscribeService#unsubscribe里的BaseRedisPubSubListener的onStatus方法,發現正如前面所說,topicType確實對不上。接著運行一段時間后,打一個斷點在AsyncSemaphore.acquire方法,觀察到listener屬性的size不斷增長,通過jmap pid GC.run觸發gc后也不會回收,問題得以復現。

      public void test() {
        for (int i = 0; i < 20000000; i++) {
          executor.submit(() -> {
            //貼入前面的代碼,提交到線程池
          });
        }
      }
      

      問題解決

      在開始排查問題的時候,筆者就在github提issue咨詢是什么原因,如何解決。他們的回復是跟這個相關,并推薦升級到3.21.2版本,不過里面提到的描述跟我的不太一樣,所以按照版本選擇的經驗,我決定將版本升級到3.17最后一個小版本3.17.7試一下,重新跑上面的測試代碼,跑一段時間后,發現問題沒有出現了。

      查看org.redisson.pubsub.PublishSubscribeService#unsubscribe源碼,發現出問題那段邏輯已經被修復了。

      經驗總結

      遇到難啃問題幾乎是每個開發不可避免的事情,解決問題的過程,方法和事后復盤,經驗總結非常重要,對個人的學習和能力提升有很大的幫助。
      以下幾點是我本次的總結:

      • 及時止損
        當生產出現問題,很多開發同學首先會想如何找到原因,解決根本問題,但實際情況應該是評估影響,及時止損,避免問題發散,擴大影響。
        例如不能在短時間內解決的,還要下來慢慢看日志,分析代碼的,能回滾的先回滾,能重啟的先重啟,爭取在出現資損前解決問題,減少對業務產生影響。

      • 向上匯報
        遇到棘手問題不要悶聲自己想辦法解決,正確做法是先向你的leader匯報問題和風險。如果問題比較棘手和嚴重,可以請求協助,避免因為個人能力不足遲遲不能解決問題,小問題拖成大問題。

      • 保留現場
        有時候問題是難以復現的,像我們本次的情況一個月可能就出現一次,如果直接重啟服務,那么等下次問題出現就非常久了。所以正確的做法是保留現場,同時要不影響業務,可以保留一個節點,將其流量摘除,通過jstack/jmap dump出程序堆棧,其它節點重啟。

      • 保持耐心
        有些問題不是一時半會就能解決的,有的以天為單位,有的可能要一個月才解決。所以保持耐心很重要,多看看官方文檔,github issue,分析源碼,嘗試各種方式,排除各種可能,相信總會找到解決方法。

      • 版本選擇
        我們選擇的redisson版本是3.17.1,實際這個選擇不是很好。按照x.y.z的版本規范,x表示大版本,通常是有重大更新,y表示小版本,通常是一些功能迭代,z表示修復版本,通常是修bug用的。例如springboot從2.x升級到3.0,jdk版本要求最低17,是一個非常重大的更新。
        上面我為什么選擇3.17.7來測試,是因為3.17.7是3.17的最后一個小版本,看到這個版本的release報告你就知道是為什么了,它全部都是在修bug。
        當然本次的問題修復不一定在.7這個版本,可能是在1-7之間的某個版本,有興趣的可以再細看下。

      更多分享,歡迎關注我的github:https://github.com/jmilktea/jtea

      posted @ 2024-09-24 10:15  jtea  閱讀(2281)  評論(1)    收藏  舉報
      主站蜘蛛池模板: 成人国产精品中文字幕| 五月天中文字幕mv在线| 国产又色又爽又黄的视频在线| 我国产码在线观看av哈哈哈网站| 一本本月无码-| 鲁一鲁一鲁一鲁一澡| 日韩本精品一区二区三区| 亚洲中文字幕伊人久久无码 | 精品久久久无码中文字幕| 国产不卡在线一区二区| 久久99精品久久久大学生| 日本A级视频在线播放| 重口SM一区二区三区视频 | 精品国产午夜理论片不卡| 性色a∨精品高清在线观看| 国产精品大片中文字幕| 亚洲一区二区乱码精品| 亚洲色大成网站WWW永久麻豆| 国产精品午夜福利合集| 国产一区二区视频在线看| 久久免费看少妇免费观看| 国产尤物精品自在拍视频首页| av永久免费网站在线观看| 国产高清视频在线播放www色| 亚洲一区二区中文字幕| 国产成人精品亚洲日本片| 天美传媒xxxxhd videos3| 在国产线视频A在线视频| 国产精品一区二区三区黄| 国产精品亚洲片在线观看麻豆| 国产午夜影视大全免费观看| 人妻少妇精品视频二区| 韩国一级毛片中文字幕| 亚洲熟妇在线视频观看| 久久精品国产亚洲成人av| 日韩人妻少妇一区二区三区| 亚洲の无码国产の无码步美| 97成人碰碰久久人人超级碰oo| www插插插无码视频网站| 亚洲欧美在线观看| 午夜免费无码福利视频麻豆|