<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      在互聯網設計架構過程中,日志異步落庫,儼然已經是高并發環節中不可缺少的一環。為什么說是高并發環節中不可缺少的呢? 原因在于,如果直接用mq進行日志落庫的時候,低并發下,生產端生產數據,然后由消費端異步落庫,是沒有什么問題的,而且性能也都是異常的好,估計tp99應該都在1ms以內。但是一旦并發增長起來,慢慢的你就發現生產端的tp99一直在增長,從1ms,變為2ms,4ms,直至send timeout。尤其在大促的時候,我司的系統就經歷過這個情況,當時mq的發送耗時超過200ms,甚至一度有不少timeout產生。

      考慮到這種情況在高并發的情況下才出現,所以今天我們就來探索更加可靠的方法來進行異步日志落庫,保證所使用的方式不會因為過高的并發而出現接口ops持續下降甚至到不可用的情況。

       

      方案一: 基于log4j的異步appender實現

      此種方案,依賴于log4j。在log4j的異步appender中,通過mq進行生產消費入庫。相當于在接口和mq之間建立了一個緩沖區,使得接口和mq的依賴分離,從而不讓mq的操作影響接口的ops。

      此種方案由于使用了異步方式,且由于異步的discard policy策略,當大量數據過來,緩沖區滿了之后,會拋棄部分數據。此種方案適用于能夠容忍數據丟失的業務場景,不適用于對數據完整有嚴格要求的業務場景。

      來看看具體的實現方式:

      首先,我們需要自定義一個Appender,繼承自log4j的AppenderSkeleton類,實現方式如下:

      public class AsyncJmqAppender extends AppenderSkeleton {
      
          @Resource(name = "messageProducer")
          private MessageProducer messageProducer;
      
          @Override
          protected void append(LoggingEvent loggingEvent) {
              asyncPushMessage(loggingEvent.getMessage());
          }
      
          /**
           * 異步調用jmq輸出日志
           * @param message
           */
          private void asyncPushMessage(Object message) {
      
              CompletableFuture.runAsync(() -> {
      
                  Message messageConverted = (Message) message;
      
                  try {
                      messageProducer.send(messageConverted);
                  } catch (JMQException e) {
                      e.printStackTrace();
                  }
      
              });
          }
      
      
          @Override
          public boolean requiresLayout() {
              return false;
          }
      
          @Override
          public void close() {
      
          }
      }
      

      然后在log4j.xml中,為此類進行配置:

      <!--異步JMQ appender-->
      <appender name="async_mq_appender" class="com.jd.limitbuy.common.util.AsyncJmqAppender">
          <!-- 設置File參數:日志輸出文件名 -->
          <param name="File" value="D:/export/Instances/order/server1/logs/order.async.jmq" />
          <!-- 設置是否在重新啟動服務時,在原有日志的基礎添加新日志 -->
          <param name="Append" value="true" />
          <!-- 設置文件大小 -->
          <param name="MaxFileSize" value="10KB" />
          <!-- 設置文件備份 -->
          <param name="MaxBackupIndex" value="10000" />
          <!-- 設置輸出文件項目和格式 -->
          <layout class="org.apache.log4j.PatternLayout">
              <param name="ConversionPattern" value="%m%n" />
          </layout>
      </appender>
      <logger name="async_mq_appender_logger">
          <appender-ref ref="async_mq_appender"/>
      </logger>
      

      最后就可以按照如下的方式進行正常使用了:

      private static Logger logger = LoggerFactory.getLogger("filelog_appender_logger");
      

      注意: 此處需要注意log4j的一個性能問題。在log4j的conversionPattern中,匹配符最好不要出現 C% L%通配符,壓測實踐表明,這兩個通配符會導致log4j打日志的效率降低10倍。

      方案一很簡便,且剝離了接口直接依賴mq導致的性能問題。但是無法解決數據丟失的問題(但是我們其實可以在本地搞個策略落盤來不及處理的數據,可以大大的減少數據丟失的幾率)。但是很多的業務場景,是需要數據不丟失的,所以這就衍生出我們的另一套方案來。

       

      方案二:增量消費log4j日志

      此種方式,是開啟worker在后臺增量消費log4j的日志信息,和接口完全脫離。此種方式相比方案一,可以保證數據的不丟失,且可以做到完全不影響接口的ops。但是此種方式,由于是后臺worker在后臺啟動進行掃描,會導致落庫的數據慢一些,比如一分鐘之后才落庫完畢。所以適用于對落庫數據實時性不高的場景。

      具體的實現步驟如下:

      首先,將需要進行增量消費的日志統一打到一個文件夾,以天為單位,每天生成一個帶時間戳日志文件。由于log4j不支持直接帶時間戳的日志文件生成,所以這里需要引入log4j.extras組件,然后配置log4j.xml如下:

      image

      之后在代碼中的申明方式如下:

      private static Logger businessLogger = LoggerFactory.getLogger("file_rolling_logger");

      最后在需要記錄日志的地方使用方式如下:

      businessLogger.error(JsonUtils.toJSONString(myMessage))
      

      這樣就可以將日志打印到一個單獨的文件中,且按照日期,每天生成一個。

      然后,當日志文件生成完畢后,我們就可以開啟我們的worker進行增量消費了,這里的增量消費方式,我們選擇RandomAccessFile這個類來進行,由于其獨特的位點讀取方式,可以使得我們非常方便的根據位點的位置來消費增量文件,從而避免了逐行讀取這種低效率的實現方式。

      注意,為每個日志文件都單獨創建了一個位點文件,里面存儲了對應的文件的位點讀取信息。當worker掃描開始的時候,會首先讀取位點文件里面的位點信息,然后找到相應的日志文件,從位點信息位置開始進行消費。這就是整個增量消費worker的核心。具體代碼實現如下(代碼太長,做了折疊):

      /**
       * @Description: 增量日志掃描worker
       * @Detail: 此worker主要用來掃描增量日志,日志本身會在不停的插入中,此worker會不停的掃描此日志來將數據上傳到kafka集群
       * @date 2018-04-08 10:30
       */
      public class LimitBuyScanWorker {
      
          /**
           * 日志和位點文件保存的目錄
           */
          private static final String FILE_DIRECTORY = "D:\\export\\Instances\\order\\server1\\logs\\";
      
          /**
           * 每次步進的長度,此處為1000行
           */
          private static final int SCAN_STEP = 1000;
      
          /**
           * 日志文件名前綴
           */
          private static final String LOG_FILE_PREFIX = "limitbuy.soa.order.";
      
          /**
           * 位點文件名后綴
           */
          private static final String OFT_FILE_APPENDIX = ".offset";
      
          public void logScanner() {
      
              //當前時間
              Date currentDate = new Date();
      
              //今日
              String currentDay = DateUtil.formatDate("yyyy-MM-dd", currentDate);
              //今日日志文件路徑
              String currentLogFilePath = FILE_DIRECTORY + LOG_FILE_PREFIX + currentDay;
              logger.error("今日的日志文件路徑:" + currentLogFilePath);
              //今日位點文件路徑
              String currentOffsetFilePath = FILE_DIRECTORY + LOG_FILE_PREFIX + currentDay + OFT_FILE_APPENDIX;
      
              //昨日
              String yesterDay = DateUtil.formatDate("yyyy-MM-dd", DateUtil.queryPlusDay(currentDate, -1));
              //昨日日志文件路徑
              String yesterdayLogFilePath = FILE_DIRECTORY + LOG_FILE_PREFIX + yesterDay;
              logger.error("昨日的日志文件路徑:" + yesterdayLogFilePath);
              //昨日位點文件路徑
              String yesterdayOffsetFilePath = FILE_DIRECTORY + LOG_FILE_PREFIX + yesterDay + OFT_FILE_APPENDIX;
      
              //先檢測昨日位點和文件體積是否一致,不一致則代表未消費完畢
              boolean yesterdayConsumedOK = checkIfConsumeOK(yesterdayLogFilePath, yesterdayOffsetFilePath);
              logger.error("昨日的日志文件已被消費完畢:" + yesterdayConsumedOK);
      
              //昨日的文件已掃描完畢
              if (yesterdayConsumedOK) {
                  //掃描并消費今日增量日志
                  scanAndConsumeLog(currentLogFilePath, currentOffsetFilePath);
              }
              //昨日的文件未掃描完畢
              else {
                  //掃描并消費昨日增量日志
                  scanAndConsumeLog(yesterdayLogFilePath, yesterdayOffsetFilePath);
              }
          }
      
          /**
           * 檢測日志是否被掃描消費完畢,true:消費完畢;false:未消費完畢
           * @Description 此舉主要防止log4j在零點大促開始的時候,突然的滾動文件造成的部分增量日志不會被消費的問題
           * @param logFilePath
           * @param offsetFilePath
           */
          private boolean checkIfConsumeOK(String logFilePath, String offsetFilePath) {
              try {
      
                  //打開文件
                  RandomAccessFile randomAccessFile = new RandomAccessFile(logFilePath, "r");
      
                  //得到當前位點
                  long currentOffset = checkOffset(offsetFilePath);
      
                  //得到文件總長
                  long currentFileLength = randomAccessFile.length();
      
                  //比對
                  if (currentOffset >= currentFileLength) {
                      return true;
                  }
                  return false;
              } catch (FileNotFoundException e) {
                  logger.error("com.jd.limitbuy.service.worker.logScanner 出錯(FileNotFoundException):", e);
                  AlarmUtil.alarm("com.jd.limitbuy.service.worker.logScanner 出錯:" + e.getMessage());
                  return false;
              } catch (IOException e) {
                  logger.error("com.jd.limitbuy.service.worker.logScanner 出錯(IOException):", e);
                  AlarmUtil.alarm("com.jd.limitbuy.service.worker.logScanner 出錯:" + e.getMessage());
                  return false;
              }
          }
      
          /**
           * 掃描并消費增量日志
           * @param logFilePath
           * @param offsetFilePath
           */
          private void scanAndConsumeLog(String logFilePath, String offsetFilePath) {
              try {
      
                  RandomAccessFile randomAccessFile = new RandomAccessFile(logFilePath, "r");
      
                  //得到當前位點
                  long currentOffset = checkOffset(offsetFilePath);
                  logger.error("開始位點==>" + currentOffset);
      
                  //重置位點到當前位點
                  if (currentOffset <= randomAccessFile.length()) {
                      randomAccessFile.seek(currentOffset);
                  }
      
                  //讀取@SCAN_STEP行
                  for (long i = currentOffset; i < currentOffset + SCAN_STEP; i++) {
                      //得到行
                      String result = randomAccessFile.readLine();
                      //如果內容不為空
                      if (StringUtil.isNotBlank(result)) {
                          //TODO 邏輯實現
                      }
                  }
      
                  //讀取@SCAN_STEP行之后的位點
                  logger.error("讀取" + SCAN_STEP + "行之后位點==>" + randomAccessFile.getFilePointer());
      
                  //如果update不成功,可以不處理,后面掃描進來重新過一遍即可
                  updateOffset(randomAccessFile.getFilePointer(), offsetFilePath);
      
                  logger.error("文件總長==>" + randomAccessFile.length());
      
              } catch (FileNotFoundException e) {
                  logger.error("com.jd.limitbuy.service.worker.logScanner 出錯(FileNotFoundException):", e);
                  AlarmUtil.alarm("com.jd.limitbuy.service.worker.logScanner 出錯:" + e.getMessage());
              } catch (IOException e) {
                  logger.error("com.jd.limitbuy.service.worker.logScanner 出錯(IOException):", e);
                  AlarmUtil.alarm("com.jd.limitbuy.service.worker.logScanner 出錯:" + e.getMessage());
              }
          }
      
          /**
           * 校驗位點
           *     不存在則創建并賦值為0
           *     已存在則更新位點
           * @param offsetFilePath
           * @return
           * @throws IOException
           */
          private long checkOffset(String offsetFilePath) throws IOException {
      
              File offsetFile = new File(offsetFilePath);
      
              //如果位點文件不存在,則創建位點文件并返回0
              if (!offsetFile.exists()) {
                  updateOffset(0, offsetFilePath);
                  return 0;
              }
              //如果位點文件存在,則返回位點文件內容
              else {
                  FileReader fileReader = new FileReader(offsetFilePath);
                  StringBuilder stringBuilder = new StringBuilder();
                  char[] bytesChar = new char[50];
                  fileReader.read(bytesChar);
                  fileReader.close();
      
                  for (char c : bytesChar) {
                      stringBuilder.append(c);
                  }
      
                  String filteredOffset = stringBuilder.toString().trim();
      
                  if (StringUtil.isNotBlank(filteredOffset)) {
                      return Long.parseLong(filteredOffset);
                  } else {
                      return 0;
                  }
              }
          }
      
          /**
           * 更新位點信息
           * @param offset
           * @param offsetFilePath
           */
          private void updateOffset(long offset, String offsetFilePath) throws IOException {
              FileWriter fileWriter = new FileWriter(offsetFilePath);
              fileWriter.write(offset + "");
              fileWriter.flush();
              fileWriter.close();
          }
      }

      此種方式由于worker掃描是每隔一段時間啟動一次進行消費,所以導致數據從產生到入庫,可能經歷時間超過一分鐘以上,但是在一些對數據延遲要求比較高的業務場景,比如庫存扣減,是不能容忍的,所以這里我們就引申出第三種做法,基于內存文件隊列的異步日志消費。

       

      方案三:基于內存文件隊列的異步日志消費

      由于方案一和方案二都嚴重依賴log4j,且方案本身都存在著要么丟數據,要么入庫時間長的缺點,所以都并不是那么盡如人意。但是本方案的做法,既解決了數據丟失的問題,又解決了數據入庫時間被拉長的尷尬,所以是終極解決之道。而且在大促銷過程中,此種方式經歷了實戰檢驗,可以大面積的推廣使用。

      此方案中提到的內存文件隊列,是我司自研的一款基于RandomAccessFile和MappedByteBuffer實現的內存文件隊列。隊列核心使用了ArrayBlockingQueue,并提供了produce方法,進行數據入管道操作,提供了consume方法,進行數據出管道操作。而且后臺有一個worker一直啟動著,每隔5ms或者遍歷了100條數據之后,就將數據落盤一次,以防數據丟失。具體的設計,就這么多,感興趣的可以根據我提供的信息,自己實踐一下。

      由于有此中間件的加持,數據生產的時候,只需要入壓入管道,然后消費端進行消費即可。未被消費的數據,會進行落盤操作,謹防數據丟失。當大促的時候,大量數據涌來的時候,管道滿了的情況下會阻塞接口,數據不會被拋棄。雖然可能會導致接口在那一瞬間無響應,但是由于有落盤操作和消費操作(此操作操控的是JVM堆外內存數據,不受GC的影響,所以不會出現操作暫停的情況,為什么呢?因為用了MappedByteBuffer),此種阻塞并未影響到接口整體的ops。

      在實際使用的時候,ArrayBlockingQueue作為核心隊列,顯然是全局加鎖的,后續我們考慮升級為無鎖隊列,所以將會參考Netty中的有界無鎖隊列:MpscArrayQueue。預計性能將會再好一些。

      受限于公司政策,我僅提供大致思路,但是不會提供具體代碼,有問題評論區交流吧。

       

      上面就是在進行異步日志消費的時候,我所經歷的三個階段,并且一步一步的優化到目前的方式。雖然過程曲折,但是結果令人歡欣鼓舞。如果喜歡就給個推薦,后續我將會持續更新你所不知道的系列,以期達到拋磚引玉的效果。

      posted on 2018-06-17 16:04  程序詩人  閱讀(14667)  評論(49)    收藏  舉報
      主站蜘蛛池模板: 国产精品成人久久电影| 国产精品偷伦费观看一次| 女人腿张开让男人桶爽| 五月花成人网| av日韩在线一区二区三区| 亚洲AV成人片不卡无码| 亚洲av成人区国产精品| 国产精品中文字幕视频| 亚洲精品无码久久一线| 亚洲人妻一区二区精品| 免费AV手机在线观看片| 国产剧情视频一区二区麻豆| 精品综合久久久久久97| 久久这里只精品热免费99| 五月婷婷激情第四季| 九九热精品在线视频观看| 最新国产精品亚洲| 亚洲一区在线成人av| 免费视频爱爱太爽了| 在线欧美中文字幕农村电影| 又大又硬又爽免费视频| 亚洲欧美国产精品专区久久| 国产成人久久精品二三区| 庆安县| 色悠悠国产精品免费观看| 国产久免费热视频在线观看| 特级欧美AAAAAAA免费观看| 蜜臀精品视频一区二区三区| 日本中文字幕不卡在线一区二区 | 国产精品老熟女露脸视频| 成A人片亚洲日本久久| 亚洲欧美日韩在线不卡| 最新亚洲春色av无码专区| 日韩视频一区二区三区视频| 久久亚洲精品无码播放| 久久这里有精品国产电影网| 国产精品人成视频免费国产| 中文字幕亚洲无线码一区女同| 国产系列高清精品第一页| 亚洲成熟女人毛毛耸耸多| 女人香蕉久久毛毛片精品|