<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12
      歡迎大家下載試用折桂單點登錄系統, https://www.zheguisoft.com

      快速導入上億行數據文件到數據庫表(使用 JDBC 的 executeBatch)

      最近在 cnblogs 網站上,看其他人博客,談及一個包含很多行(一億)的大文件,一周之內,將其數據導入到數據庫表。

      我談到可以使用“使用數據庫事務,分批 commit 到數據庫,每批次有 5000行”的方法,提高數據導入速度,兩天應該就可以了。

      好像博主及下方評論者,不太理解,這個“分批 commit ”

      特寫此博客,介紹一下使用 JDBC 的 executeBatch 做分批 commit,以提高大批量數據的導入速度。

       

      JDBC 有個 PreparedStatement 類,包含 addBatch, executeBatch 等函數(或稱之為方法,我不區分這兩個概念)。配合 Connection 的setAutoCommit(false), commit(),即可實現“分批 commit ”

      當然,首先要逐行讀數據文件。這里的數據文件,一般是 .txt 或 .csv 之類的純文本文件,以逗號作為列分割,有的以 tab 做分割字符,也有的使用固定列寬(比如1-4字符為第一列,5-12為第二列...)。

      我們使用 BufferedReader 來實現逐行讀取。這是一個常用的 Java 類,可很好地用于此處文件讀取。

       

      為了方便起見,軟件將從 Java 命令行讀取 JVM 參數,舉例如下:

      -Ddata_file=C:\svn_projects\sgm_small_projects\batch_data_import\data\sample_data_1w.csv -Dfrom_line=1 -Dto_line= -Dbatch_commit_size=5000 -Duse_multi_thread=false

      參數解釋如下:

      data_file 為數據文件;

      from_line 用于指定數據文件中的起始行號,最小值為1,一方面可用于跳過標題行,另一方面,可用于長時間運行過程中,如有中斷,可重新從某行開始;

      to_line 用于指定數據文件中的結束行號,可空;

      batch_commit_size 用于指定每批次的數據行數,可調整,以便測試哪種參數,導入數據最快,此處配置為5000;

      use_multi_thread 用于指定程序是否使用多線程,此參數暫無用處。

       

      大批量數據文件導入,一般的策略為:

      a. 正確的數據,盡量全部導入;

      b. 錯誤的數據,跳過、記錄報錯數據行信息,繼續運行;

      c. 全部導入完成后,分析錯誤的數據,特殊處理。

       

      以下介紹的代碼,可以很好地實現這幾個策略(報錯到批次、行號范圍)。

      運行時有類似如下的日志信息:

      15:49:49.201 [main] INFO c.z.b.srv.DatabaseSrv - saveBatchDataInTrasaction begin,iBatchNum:1
      15:49:51.416 [main] INFO c.z.b.srv.DatabaseSrv - 批量 commit,批次號:1, 原數據文件行[1-5000], 提交成功.
      15:49:51.422 [main] INFO c.z.b.srv.DatabaseSrv - saveBatchDataInTrasaction begin,iBatchNum:2
      15:49:52.306 [main] INFO c.z.b.srv.DatabaseSrv - 批量 commit,批次號:2, 原數據文件行[5001-10000], 提交成功.
      15:49:52.329 [main] INFO c.z.b.srv.DatabaseSrv - saveBatchDataInTrasaction begin,iBatchNum:3
      15:49:53.253 [main] INFO c.z.b.srv.DatabaseSrv - 批量 commit,批次號:3, 原數據文件行[10001-15000], 提交成功.
      15:49:53.277 [main] INFO c.z.b.srv.DatabaseSrv - saveBatchDataInTrasaction begin,iBatchNum:4
      15:49:54.188 [main] INFO c.z.b.srv.DatabaseSrv - 批量 commit,批次號:4, 原數據文件行[15001-20000], 提交成功.

      如果 5001-10000 行處理失敗,則下次運行時,更改啟動參數 from_line=5001, to_line=10000, batch_commit_size 調小為 100,再次運行。

      將出錯行號,逐步定位到更小的批次里。

       

       

      主控程序 BatchDataImportMain ,功能為讀取以上參數,然后使用 reader 讀數據文件,最后調用 dataSrv.saveData ,代碼如下:

      public class BatchDataImportMain {
      
          public static void main(String[] args) {
              Logger log = LoggerFactory.getLogger(BatchDataImportMain.class);
              try {
                  log.info("從命令行參數中獲取數據...");
                  String strDataFile = System.getProperty("data_file");
      
                  // 數據文件的第一行為1,不是0,方便用戶理解
                  String strFromLine = System.getProperty("from_line", "1");
      
                  String strToLine = System.getProperty("to_line");
      
                  MutableObject<Long> iFromLine = null;
                  if (StringUtils.isNotEmpty(strFromLine)) {
                      iFromLine = new MutableObject<Long>();
                      iFromLine.setValue(Long.parseLong(strFromLine));
                  }
      
                  MutableObject<Long> iToLine = null;
                  if (StringUtils.isNotEmpty(strToLine)) {
                      iToLine = new MutableObject<Long>();
                      iToLine.setValue(Long.parseLong(strToLine));
                  }
      
                  String strBatchCommitSize = System.getProperty("batch_commit_size");
                  int iBatchCommitSize = 5000;
                  if (StringUtils.isNotEmpty(strBatchCommitSize)) {
                      iBatchCommitSize = Integer.parseInt(strBatchCommitSize);
                  }
      
                  String strUseMultiThread = System.getProperty("use_multi_thread");
                  boolean bUseMultiThread = false;
                  if (StringUtils.equalsIgnoreCase(strUseMultiThread, "true")) {
                      bUseMultiThread = true;
                  }
      
                  File fDataFile = new File(strDataFile);
      
                  log.info("begin save data from file:" + fDataFile.getAbsolutePath());
                  DataImportSrvBase dataSrv = null;
                  if (bUseMultiThread) {
                      dataSrv = new DataImportSrvUseThread();
                  } else {
                      dataSrv = new DataImportSrvNotUseThread();
                  }
      
                  try (FileInputStream fis = new FileInputStream(fDataFile)) {
                      String charsetName = "gbk";
                      try (InputStreamReader isr = new InputStreamReader(fis, charsetName)) {
                          try (BufferedReader br = new BufferedReader(isr)) {
                              dataSrv.saveData(br, iFromLine, iToLine, iBatchCommitSize, fDataFile.getName());
                          }
                      }
      
                  }
                  log.info("ends save data from file:" + fDataFile.getAbsolutePath());
      
              } catch (Exception err) {
                  log.error(err.getMessage(), err);
              }
          }
      
      }

       

      以上 dataSrv 為不采用多線程的 DataImportSrvNotUseThread,此類的功能,是將 reader 中的數據,逐行取出,每5000行為一批次,調用數據保存代碼。

      內存占用最多為5000行數據,不會導致內存溢出。

      分批時,記錄當前批次的數據中,在原始數據文件中的起始行號、結束行號、當前第幾批。

       

      DataImportSrvNotUseThread 代碼如下:

      public class DataImportSrvNotUseThread extends DataImportSrvBase {
      
          @Override
          public void saveData(BufferedReader br, MutableObject<Long> iFromLine, MutableObject<Long> iToLine,
                  int iBatchCommitSize, String fileName) throws IOException, SQLException {
              String strLine = null;
              // DataLineParseSrv dataSrv = new DataLineParseSrv();
      
              LinkedList<LineString> batchLineDataBufferList = new LinkedList<LineString>();
              long iBatchNum = 0;
              long iLineNumOfFile = 0;
      
              while ((strLine = br.readLine()) != null) {
                  iLineNumOfFile++;
                  // 忽略不在指定行號范圍內的數據行
                  if (iFromLine != null && iFromLine.getValue() > iLineNumOfFile) {
                      continue;
                  }
                  if (iToLine != null && iToLine.getValue() < iLineNumOfFile) {
                      break;
                  }
      
                  // 忽略空行
                  if (StringUtils.isEmpty(strLine)) {
                      continue;
                  }
      
                  // LineData data = dataSrv.parse(line);
                  LineString lineData = new LineString();
                  lineData.strLine = strLine;
                  lineData.lineNumAtFile = iLineNumOfFile;
      
                  batchLineDataBufferList.add(lineData);
      
                  if (batchLineDataBufferList.size() >= iBatchCommitSize) {
                      iBatchNum++;
                      long iLineNumBeginOfBatch = batchLineDataBufferList.getFirst().lineNumAtFile;
                      long iLineNumEndOfBatch = lineData.lineNumAtFile;
                      new DatabaseSrv().saveBatchDataInTrasaction(iBatchNum, iLineNumBeginOfBatch, iLineNumEndOfBatch,
                              batchLineDataBufferList);
                      batchLineDataBufferList = new LinkedList<LineString>();
                  }
              }
      
              if (batchLineDataBufferList.size() > 0) {
                  iBatchNum++;
                  long iLineNumBeginOfBatch = batchLineDataBufferList.getFirst().lineNumAtFile;
                  long iLineNumEndOfBatch = batchLineDataBufferList.getLast().lineNumAtFile;
      
                  new DatabaseSrv().saveBatchDataInTrasaction(iBatchNum, iLineNumBeginOfBatch, iLineNumEndOfBatch,
                          batchLineDataBufferList);
                  batchLineDataBufferList = new LinkedList<LineString>();
              }
          }
      
      }

       

       

      最后,DatabaseSrv 類的 saveBatchDataInTrasaction 函數,保存一批數據,使用一個數據庫連接、一個 transaction. 此函數內部,使用 PreparedStatement 的executeBatch。

      此處使用了數據庫連接池。

      有的數據庫,初次建立連接,用時很長,而使用數據庫連接池,相比未使用數據庫連接池,可大幅提高性能。

       

      DatabaseSrv 代碼如下:

      public class DatabaseSrv {
          static BasicDataSource g_ds = null;
      
          public void saveBatchDataInTrasaction(long iBatchNum, long iLineNumBeginOfBatch, long iLineNumEndOfBatch,
                  List<LineString> batchLineDataBufferList) throws SQLException {
              Logger log = LoggerFactory.getLogger(DatabaseSrv.class);
              log.info("saveBatchDataInTrasaction begin,iBatchNum:" + iBatchNum);
      
              boolean bCommitSuccess = false;
              try {
                  DataLineParseSrv dataSrv = new DataLineParseSrv();
                  BasicDataSource ds = getDataSource();
      
                  try (Connection con = ds.getConnection()) {
                      con.setAutoCommit(false);
                      con.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
      
                      String sql = "insert into tt_test(col_a,col_b,col_c,col_d,col_e,col_f,col_g,col_h,col_i,col_j,col_k,col_l) values(?,?,?,?,?,?,?,?,?,?,?,?);";
                      try (PreparedStatement ps = con.prepareStatement(sql)) {
                          for (LineString d : batchLineDataBufferList) {
                              LineParsedData parsedData = dataSrv.parse(d);
      
                              long iLineNum = d.lineNumAtFile;// 也可以在表中,先增加一列,保存數據行號。以便檢查哪些行成功導入了。
      
                              ps.setString(1, parsedData.a); // 1 is the first ? (1 based counting)
                              ps.setString(2, parsedData.b);
                              ps.setString(3, parsedData.c);
                              ps.setString(4, parsedData.d);
                              ps.setString(5, parsedData.e);
                              ps.setString(6, parsedData.f);
                              ps.setString(7, parsedData.g);
                              ps.setString(8, parsedData.h);
                              ps.setString(9, parsedData.i);
                              ps.setString(10, parsedData.j);
                              ps.setString(11, parsedData.k);
                              ps.setString(12, parsedData.l);
      
                              ps.addBatch();
                          }
                          ps.executeBatch();
      
                          con.commit();
      
                          // 標記為成功
                          bCommitSuccess = true;
      
                          // statement.clearBatch(); //If you want to add more,
                      } catch (Exception err) {
                          log.error(err.getMessage(), err);
                          con.rollback();
                      }
                  }
      
              } catch (Exception err) {
                  log.error(err.getMessage(), err);
              }
      
              if (bCommitSuccess) {
                  log.info("批量 commit,批次號:" + iBatchNum + ", 原數據文件行[" + iLineNumBeginOfBatch + "-" + iLineNumEndOfBatch
                          + "], 提交成功.");
              } else {
                  log.info("批量 commit,批次號:" + iBatchNum + ", 原數據文件行[" + iLineNumBeginOfBatch + "-" + iLineNumEndOfBatch
                          + "], 提交失敗.");
              }
          }
      
          public static BasicDataSource getDataSource() {
              if (g_ds != null) {
                  return g_ds;
              } else {
                  BasicDataSource ds = new BasicDataSource();
                  ds.setDriverClassName("org.postgresql.Driver");
                  ds.setTestOnBorrow(true);
                  ds.setUrl("jdbc:postgresql://192.168.1.50:5432/zg_prt_uld");
                  ds.setValidationQuery("select 1 as a;");
                  ds.setUsername("zg_prt_uld_db_user");
                  ds.setPassword("xxxx");
      
                  ds.setInitialSize(1);
                  ds.setMaxActive(30);
      
                  g_ds = ds;
                  return g_ds;
              }
          }
      
      }

       

       

      還有一些重要性較低的代碼,此處未貼出。如需要,也可提供。

      經初步測試,以上代碼,未使用多線程,導入 2萬行數據,運行三次,用時分別為 5.696秒, 4.968 秒, 5.04 秒。

      按第一次運行的速度(3511行/秒),導入 2 億行數據,順利的話,預估完成導入所用時間為 15.8小時

      即使加上異常數據分析、特殊處理的操作,也能很好完成該博主的工作任務(1周之內完成數據導入)。

       

      當然,此處代碼,仍有性能優化的余地。

       

      以上性能測試,使用的是 Postgres 數據庫,本地無線局域網連接。

       

      ===============歡迎轉載,轉載請注明出處:http://www.rzrgm.cn/jacklondon/

      轉載請注明出處: http://www.rzrgm.cn/jacklondon ; 歡迎訪問 http://www.zheguisoft.com/ 并提建議。
      posted @ 2020-12-26 16:33  杰克倫敦塵  Views(1248)  Comments(2)    收藏  舉報
      歡迎大家下載試用折桂單點登錄系統, https://www.zheguisoft.com
      主站蜘蛛池模板: 亚洲精品久久久久久无码色欲四季| 成人免费亚洲av在线| 欧美性猛交xxxx乱大交丰满| 亚洲爆乳少妇无码激情| 亚洲综合一区无码精品| 丰满岳乱妇久久久| 国产在线午夜不卡精品影院| 凤庆县| 国产在线精品欧美日韩电影| 人妻少妇精品性色av蜜桃| 久久91精品牛牛| 2020国产欧洲精品网站| 综合色综合色综合色综合| 国产国语一级毛片| 人妻夜夜爽天天爽三区麻豆av| 日韩精品一区二区在线视| 亚洲综合天堂一区二区三区| 久久亚洲精品情侣| 国产美女被遭强高潮免费一视频| 日本一区三区高清视频| 好男人社区神马在线观看www| 延吉市| 亚洲Av综合日韩精品久久久| 九九热在线精品视频99| 国产成人精品久久综合| 欧洲亚洲精品免费二区| 99在线小视频| 一区二区亚洲人妻精品| 18av千部影片| 麻豆一区二区中文字幕| 自拍视频亚洲精品在线| 国产欧美国日产高清| av无码一区二区大桥久未| 亚洲区一区二区三区亚洲| 久久精品国产亚洲不av麻豆| 亚洲人成网网址在线看| 精品久久综合日本久久网| 久久av色欲av久久蜜桃网| 人人妻人人澡人人爽曰本| 熟女一区| 精品一区二区三区蜜桃久|