java~并行計算~大集合的并行處理
上一次寫了關(guān)于《FunctionalInterface~一個批量處理數(shù)據(jù)的類》和《Future和Callable實現(xiàn)大任務(wù)的并行處理》的文章,本講主要結(jié)合實際應(yīng)用,來封裝一個集合并行處理組件,我們的集合分為數(shù)據(jù)庫查詢出現(xiàn)的分頁集合;還有一個是內(nèi)存的集合,今天主要說一下內(nèi)存集合的并行處理。
場景介紹
- 有一個比較耗時的工作,將top 400的用戶的行為信息統(tǒng)計
- 統(tǒng)計的信息來自很多業(yè)務(wù),很多服務(wù),不能使用聚合直接計算
- 這些業(yè)務(wù)統(tǒng)計的時間,大概每個人平均需要1秒
- 這些用戶的各種類型,彼此獨立,沒有關(guān)系
如何設(shè)計
如果直接順序?qū)懘a,那1萬的用戶,需要400秒的時間,這是我們不能接受的,我們使用并行編程8秒就把它搞定。
如何實現(xiàn)
- 400的集合,進行拆分,每100個為一組,分為4組(4頁)
- 對每100個集合進行拆分,每2個為1組,將100個分成了50組
- 對50組數(shù)據(jù),開50個線程并行處理,結(jié)果為2行完成
- 400的信息,分成了4頁,每頁2秒,一共8秒
代碼實現(xiàn)
/**
* 數(shù)據(jù)集并行處理工具
*/
public class DataHelper {
/**
* 并行處理線程數(shù)字
*/
static final int THREAD_COUNT = 50;
/**
* 單線程中處理的集合的長度,50個線程,每個線程處理2條,如果處理時間為1S,則需要2S的時間.
*/
static final int INNER_LIST_LENGTH = 2;
static Logger logger = LoggerFactory.getLogger(DataHelper.class);
/**
* 大集合拆分.
*
* @param list
* @param len
* @param <T>
* @return
*/
private static <T> List<List<T>> splitList(List<T> list, int len) {
if (list == null || list.size() == 0 || len < 1) {
return null;
}
List<List<T>> result = new ArrayList<List<T>>();
int size = list.size();
int count = (size + len - 1) / len;
for (int i = 0; i < count; i++) {
List<T> subList = list.subList(i * len, ((i + 1) * len > size ? size : len * (i + 1)));
result.add(subList);
}
return result;
}
/**
* 并行處理.
*
* @param list 大集合
* @param pageSize 單頁數(shù)據(jù)大小
* @param consumer 處理程序
* @param <T>
*/
public static <T> void fillDataByPage(List<T> list,
int pageSize,
Consumer<T> consumer) {
List<List<T>> innerList = new ArrayList<>();
splitList(list, pageSize).forEach(o -> innerList.add(o));
int totalPage = innerList.size();
AtomicInteger i = new AtomicInteger();
innerList.forEach(items -> {
ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
i.getAndIncrement();
Collection<BufferInsert<T>> bufferInserts = new ArrayList<>();
splitList(items, INNER_LIST_LENGTH).forEach(o -> {
bufferInserts.add(new BufferInsert(o, consumer));
});
try {
executor.invokeAll(bufferInserts);
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.shutdown();
logger.info("【當(dāng)前數(shù)據(jù)頁:{}/{}】", i.get(), totalPage);
});
}
/**
* 多線程并發(fā)處理數(shù)據(jù).
*
* @param <T>
*/
static class BufferInsert<T> implements Callable<Integer> {
/**
* 要處理的數(shù)據(jù)列表.
*/
List<T> items;
/**
* 處理程序.
*/
Consumer<T> consumer;
public BufferInsert(List<T> items, Consumer<T> consumer) {
this.items = items;
this.consumer = consumer;
}
@Override
public Integer call() {
for (T item : items) {
this.consumer.accept(item);
}
return 1;
}
}
}
調(diào)用代碼
/**
* 8秒處理400個任務(wù),每個任務(wù)執(zhí)行時間為1S,并行的威力
*/
@Test
public void test() {
List<Integer> sumList = new ArrayList<>();
for (int i = 0; i < 400; i++) {
sumList.add(i);
}
StopWatch stopWatch = new StopWatch();
stopWatch.start();
DataHelper.fillDataByPage(sumList, 100, (o) -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
stopWatch.stop();
System.out.println("time:" + stopWatch.getTotalTimeMillis());
}
結(jié)果截圖

浙公網(wǎng)安備 33010602011771號