1. 源碼為對粒子移動狀態模擬的項目。要求使用多種優化方法,對比串行優化、多線程優化、全部優化下的加速比。
2. 代碼
以下為核心優化代碼及分析
#include <stdlib.h>
#include <iostream>
#include <math.h>
#include "../headers/global.h"
#include "../headers/sphkernel.h"
using namespace std;
//計算距離
void getPairwiseSeparations(double** &pos)
{
// 1. 提出循環不變量
// 2. todo simd
// 3. 對 j 進行循環分塊 影響dx dy dz的命中,不適用
// 4. omp
#if defined(OPT_BASE) && (defined(OPT_SIMD)||defined(OPT_OMP))
#ifdef OPT_OMP
#pragma omp parallel for schedule(guided) proc_bind(close)
#endif
for (int i = 0; i < N; i++)
{
#ifndef OPT_SIMD
double temp1 = pos[0][i];
double temp2 = pos[1][i];
double temp3 = pos[2][i];
for (int j = 0; j < N; j++)
{
// dx[i][j] = -dx[j][i] 粒子彼此計算相對距離
dx[i][j] = pos[0][j] - temp1;
dy[i][j] = pos[1][j] - temp2;
dz[i][j] = pos[2][j] - temp3;
}
#else
float64x2_t v0 = vld1q_dup_f64(&pos[0][i]);
float64x2_t v1 = vld1q_dup_f64(&pos[1][i]);
float64x2_t v2 = vld1q_dup_f64(&pos[2][i]);
for(int j=0;j<N/2*2;j+=2)
{
float64x2_t v0_0 = vld1q_f64(&pos[0][j]);
float64x2_t v1_0 = vld1q_f64(&pos[1][j]);
float64x2_t v2_0 = vld1q_f64(&pos[2][j]);
vst1q_f64(&dx[i][j],vsubq_f64(v0_0,v0));
vst1q_f64(&dy[i][j],vsubq_f64(v1_0,v1));
vst1q_f64(&dz[i][j],vsubq_f64(v2_0,v2));
}
for (int j = N/2*2; j < N; j++)
{
dx[i][j] = pos[0][j] - pos[0][i];
dy[i][j] = pos[1][j] - pos[1][i];
dz[i][j] = pos[2][j] - pos[2][i];
}
#endif
}
#else
#ifdef OPT_OMP
#pragma omp parallel for schedule(guided) proc_bind(close)
#endif
for (int i = 0; i < N; i++)
{
for (int j = 0; j < N; j++)
{
// dx[i][j] = -dx[j][i] 粒子彼此計算相對距離
dx[i][j] = pos[0][j] - pos[0][i];
dy[i][j] = pos[1][j] - pos[1][i];
dz[i][j] = pos[2][j] - pos[2][i];
//fprintf(stdout, "%12.6f", dz[i][j]);
//fflush(stdout);
}
//fprintf(stdout,"\n");
}
#endif
}
void getW(double** &dx, double** &dy, double** &dz, const double h)
{
// 1. 循環不變量提出
// 2. omp
#if defined(OPT_OMP) || defined(OPT_BASE)
double value1 = pow((1.0 / (h*sqrt(pi))), 3.0);
double value2 = pow(h,2);
#ifdef OPT_OMP
#pragma omp parallel for schedule(guided) proc_bind(close)
#endif
for (int i = 0; i < N; i++)
{
for (int j = 0; j < N; j++)
{
r[i][j] = sqrt(pow(dx[i][j],2.0) + pow(dy[i][j],2.0) + pow(dz[i][j],2.0));
W[i][j] = value1 * exp((-pow(r[i][j],2) / value2));
}
}
#else
#ifdef OPT_OMP
#pragma omp parallel for schedule(guided) proc_bind(close)
#endif
for (int i = 0; i < N; i++)
{
for (int j = 0; j < N; j++)
{
r[i][j] = sqrt(pow(dx[i][j],2.0) + pow(dy[i][j],2.0) + pow(dz[i][j],2.0));
W[i][j] = pow((1.0 / (h*sqrt(pi))), 3.0) * exp((-pow(r[i][j],2) / pow(h,2)));
//fprintf(stdout, "%12.6f", r[i][j]);
//fprintf(stdout, "%12.6f", W[i][j]);
//fflush(stdout);
}
}
#endif
//fprintf(stdout,"\n");
}
void getGradW(double** &dx, double** &dy, double** &dz, const double h)
{
// 1. 循環不變量提出
// 2. omp
#if defined(OPT_OMP) || defined(OPT_BASE)
double value1 = pow(h,2);
double value2 = -2/pow(h,5)/pow(pi,(3/2));
#ifdef OPT_OMP
#pragma omp parallel for schedule(guided) proc_bind(close)
#endif
for (int i = 0; i < N; i++)
{
for (int j = 0; j < N; j++)
{
r[i][j] = sqrt(pow(dx[i][j],2.0) + pow(dy[i][j],2.0) + pow(dz[i][j],2.0));
gradPara[i][j] = exp(-pow(r[i][j],2) / value1) * value2;
wx[i][j] = gradPara[i][j]*dx[i][j];
wy[i][j] = gradPara[i][j]*dy[i][j];
wz[i][j] = gradPara[i][j]*dz[i][j];
}
}
#else
#ifdef OPT_OMP
#pragma omp parallel for schedule(guided) proc_bind(close)
#endif
for (int i = 0; i < N; i++) {
for (int j = 0; j < N; j++) {
// r[i][j] = r[j][i]
r[i][j] = sqrt(pow(dx[i][j],2.0) + pow(dy[i][j],2.0) + pow(dz[i][j],2.0));
// gradPara[i][j] = gradPara[j][i]
gradPara[i][j] = -2 * exp(-pow(r[i][j],2) / pow(h,2)) / pow(h,5) / pow(pi,(3/2));
// wx[i][j] = -wx[j][i]
wx[i][j] = gradPara[i][j]*dx[i][j];
wy[i][j] = gradPara[i][j]*dy[i][j];
wz[i][j] = gradPara[i][j]*dz[i][j];
//fprintf(stdout, "%12.6f", wy[i][j]);
//fflush(stdout);
}
//fprintf(stdout,"\n");
}
#endif
}
void getDensity(double** &pos, double &m, const double h)
{
getPairwiseSeparations(pos);
getW(dx, dy, dz, h);
// 1. 改變訪問順序
// 2. 內層可 simd
// 3. 外層omp有數據競爭,內層omp偽共享(或者使用cacheline大小為步進), omp 不適用
// 4. 簡化計算公式 rho[j] += W[i][j] rho[i] *= m;W 每一列計算出一個 rho[j] 此處不適用
#ifdef OPT_BASE
for(int i = 0; i < N; i++)
{
for(int j = 0; j < N; j++)
rho[j] += m * W[i][j];
}
#else
for (int j = 0; j < N; j++)
{
for (int i = 0; i < N; i++)
rho[j] += m * W[i][j];
//fprintf(stdout, "%12.6f", rho[j]);
//fflush(stdout);
//fprintf(stdout,"\n");
}
#endif
}
void getPressure(double* &rho, const double k, double &n)
{
// 1. 提出循環不變量,循環展開
// 2. omp 線程調度開銷大,不合適
// 3. simd pow較復雜
#ifdef OPT_BASE
double value = 1+1/n;
for (int j = 0; j < N; j++)
P[j] = k * pow(rho[j], value);
#else
for (int j = 0; j < N; j++)
{
P[j] = k * pow(rho[j], (1+1/n));
//fprintf(stdout, "%12.6f\n", P[j]);
}
#endif
}
void getAcc(double** &pos, double** &vel, double &m, const double h, const double k, double &n, double lmbda, const double nu)
{
getDensity(pos, m, h);
getPressure(rho, k, n);
getPairwiseSeparations(pos);
getGradW(dx, dy, dz, h);
#if defined(OPT_BASE)
#ifdef OPT_OMP
#pragma omp parallel for schedule(guided) proc_bind(close)
#endif
for (int j = 0; j < N; j++)
{
// 1. wx[i][j] = -wx[j][i] 訪問 w[j][i] 可以增加緩存命中,并且可以向量化
// 如果for循環交換 j i訪問順序,先訪問i 后 j,內層循環無法做向量化優化(內循環每次計算不同的目標元素)
// 并且,如果for循環先訪問j,計算 acc[0][j], P[j]/pow(rho[j],2)是常量可以提出最后計算
// 2. 簡化計算 m* 放在求和之后
// 3. 循環不變量提出
double temp1 = P[j]/pow(rho[j],2);
double temp3 =0.0,temp4=0.0,temp5=0.0;
for (int i = 0; i < N; i++)
{
double temp2 = pow(P[i]/rho[i],2);
temp3 += (temp1 + temp2) * wx[j][i];
temp4 += (temp1 + temp2) * wy[j][i];
temp5 += (temp1 + temp2) * wz[j][i];
}
acc[0][j] += (temp3 *=m);
acc[1][j] += (temp4 *=m);
acc[2][j] += (temp5 *=m);
}
#else
#ifdef OPT_OMP
#pragma omp parallel for schedule(guided) proc_bind(close)
#endif
for (int j = 0; j < N; j++)
{
for (int i = 0; i < N; i++)
{
acc[0][j] -= m * ( P[j]/pow(rho[j],2) + pow(P[i]/rho[i],2) ) * wx[i][j];
acc[1][j] -= m * ( P[j]/pow(rho[j],2) + pow(P[i]/rho[i],2) ) * wy[i][j];
acc[2][j] -= m * ( P[j]/pow(rho[j],2) + pow(P[i]/rho[i],2) ) * wz[i][j];
}
}
#endif
// 1. simd
// 2. 循環合并
#ifdef OPT_BASE
for (int j = 0; j < N; j++)
{
acc[0][j] -= (lmbda * pos[0][j] + nu * vel[0][j]);
acc[1][j] -= (lmbda * pos[1][j] + nu * vel[1][j]);
acc[2][j] -= (lmbda * pos[2][j] + nu * vel[2][j]);
}
#else
for (int j = 0; j < N; j++)
{
acc[0][j] -= lmbda * pos[0][j];
acc[1][j] -= lmbda * pos[1][j];
acc[2][j] -= lmbda * pos[2][j];
}
for (int j = 0; j < N; j++)
{
acc[0][j] -= nu * vel[0][j];
acc[1][j] -= nu * vel[1][j];
acc[2][j] -= nu * vel[2][j];
}
#endif
}
#ifdef OPT_SIMD
// 需要用到 泰勒展開
float64_t exp_(float64_t x)
{
//初始化第一個值
int n = 0;
double prior = 1.0;
double sum = prior; //求和保存結果
while(1)
{
double cur = prior * x /++n;
sum += cur;
prior = cur;
if(cur<=EPSILON)
break;
}
return sum;
}
// a^b = e^(b*ln(a)); neon 未提供ln,設想采用 cmath ln函數,向量化對每個元素的計算用 omp task
// float64_t pow_(float64_t a,float64_t b)
// {
// logf()
// }
#endif
3. 測試數據
3.1 所有編譯優化選項為 O2,不開啟向量自動化優化。代碼內函數計時
| 項目 |
耗時 s |
相比源碼加速比 |
| original(源碼) |
141 |
|
| BASE(循環優化) |
58.7 |
2.4 |
| BASE+SIMD |
51 |
2.8 |
| BASE+OMP |
10 |
14.1 |
| OMP |
23.1 |
6.1 |
| BASE+OMP+SIMD |
6.47 |
21.8 |
3.2 gprof 耗時分析
original

BASE

BASE_SIMD

BASE_OMP

OMP

BASE_SIMD_OMP

4. 結果分析
4.1 程序內計時
(1) 單核串行耗時從 141s 優化為 51s,加速比為 2.8;
(2) 多核32線程耗時 23.1s(實測16線程耗時一致),加速比為 6.1;
(3) 綜合優化后耗時 6.47,加速比為 21.8。
4.2 其他方面
(1) 多線程優化方面 schedule(guided) 策略擁有最高的效率,比 dynamic 略優。
(2) 基礎優化和多線程有較大的加速比提升。
(3) 16線程與32線程的效率一致。
(4) 注意多線程數據競爭問題,可能由于此問題造成多線程效率下降。
(5) 注意cacheline 競爭問題,避免多線程效率下降和計算錯誤。
(6) 注意在不同測試項下確認計算結果的正確性,結算結果應保持一致。
4.3 gprof分析
(1) 使用openmp 會明顯增加線程框架開銷,每增加一個線程的并行使用會增加相應的線程開銷;
(2) getAcc 函數從源碼的時間從 36.9s 最終將為幾乎為 0;

(3) 函數計算的耗時比例從 100% 降為 15.9%,其余 83.78% 為多線程和框架管理帶來的開銷。